Skip to content

Commit

Permalink
[RUST] Write List/LargeList/FixedSizeList/FixedSizeBinary (#421)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Jan 9, 2023
1 parent 39ddad9 commit 25cf897
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 40 deletions.
49 changes: 34 additions & 15 deletions rust/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
//! To improve Arrow-RS egonomitic
use arrow_array::{
Array, ArrayRef, FixedSizeBinaryArray, FixedSizeListArray, Int32Array, ListArray, RecordBatch,
UInt8Array,
Array, ArrayRef, FixedSizeBinaryArray, FixedSizeListArray, Int32Array, Int64Array,
LargeListArray, ListArray, RecordBatch, UInt8Array,
};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType, Field};
Expand Down Expand Up @@ -58,10 +58,8 @@ pub trait DataTypeExt {

impl DataTypeExt for DataType {
fn is_binary_like(&self) -> bool {
matches!(
self,
DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary
)
use DataType::*;
matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
}

fn is_struct(&self) -> bool {
Expand Down Expand Up @@ -105,19 +103,19 @@ pub trait ListArrayExt {
///
/// let offsets = Int32Array::from_iter([0, 2, 7, 10]);
/// let int_values = Int64Array::from_iter(0..10);
/// let list_arr = ListArray::new(int_values, &offsets).unwrap();
/// let list_arr = ListArray::try_new(int_values, &offsets).unwrap();
/// assert_eq!(list_arr,
/// ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
/// Some(vec![Some(0), Some(1)]),
/// Some(vec![Some(2), Some(3), Some(4), Some(5), Some(6)]),
/// Some(vec![Some(7), Some(8), Some(9)]),
/// ]))
/// ```
fn new<T: Array>(values: T, offsets: &Int32Array) -> Result<ListArray>;
fn try_new<T: Array>(values: T, offsets: &Int32Array) -> Result<ListArray>;
}

impl ListArrayExt for ListArray {
fn new<T: Array>(values: T, offsets: &Int32Array) -> Result<Self> {
fn try_new<T: Array>(values: T, offsets: &Int32Array) -> Result<Self> {
let data = ArrayDataBuilder::new(DataType::List(Box::new(Field::new(
"item",
values.data_type().clone(),
Expand All @@ -132,6 +130,27 @@ impl ListArrayExt for ListArray {
}
}

// TODO: merge with ListArrayExt?;
pub trait LargeListArrayExt {
fn try_new<T: Array>(values: T, offsets: &Int64Array) -> Result<LargeListArray>;
}

impl LargeListArrayExt for LargeListArray {
fn try_new<T: Array>(values: T, offsets: &Int64Array) -> Result<Self> {
let data = ArrayDataBuilder::new(DataType::LargeList(Box::new(Field::new(
"item",
values.data_type().clone(),
true,
))))
.len(offsets.len() - 1)
.add_buffer(offsets.into_data().buffers()[0].clone())
.add_child_data(values.into_data().clone())
.build()?;

Ok(Self::from(data))
}
}

pub trait FixedSizeListArrayExt {
/// Create an [`FixedSizeListArray`] from values and list size.
///
Expand All @@ -141,7 +160,7 @@ pub trait FixedSizeListArrayExt {
/// use lance::arrow::FixedSizeListArrayExt;
///
/// let int_values = Int64Array::from_iter(0..10);
/// let fixed_size_list_arr = FixedSizeListArray::new(int_values, 2).unwrap();
/// let fixed_size_list_arr = FixedSizeListArray::try_new(int_values, 2).unwrap();
/// assert_eq!(fixed_size_list_arr,
/// FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
/// Some(vec![Some(0), Some(1)]),
Expand All @@ -151,11 +170,11 @@ pub trait FixedSizeListArrayExt {
/// Some(vec![Some(8), Some(9)])
/// ], 2))
/// ```
fn new<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray>;
fn try_new<T: Array>(values: T, list_size: i32) -> Result<FixedSizeListArray>;
}

impl FixedSizeListArrayExt for FixedSizeListArray {
fn new<T: Array>(values: T, list_size: i32) -> Result<Self> {
fn try_new<T: Array>(values: T, list_size: i32) -> Result<Self> {
let list_type = DataType::FixedSizeList(
Box::new(Field::new("item", values.data_type().clone(), true)),
list_size,
Expand All @@ -178,7 +197,7 @@ pub trait FixedSizeBinaryArrayExt {
/// use lance::arrow::FixedSizeBinaryArrayExt;
///
/// let int_values = UInt8Array::from_iter(0..10);
/// let fixed_size_list_arr = FixedSizeBinaryArray::new(&int_values, 2).unwrap();
/// let fixed_size_list_arr = FixedSizeBinaryArray::try_new(&int_values, 2).unwrap();
/// assert_eq!(fixed_size_list_arr,
/// FixedSizeBinaryArray::from(vec![
/// Some(vec![0, 1].as_slice()),
Expand All @@ -188,11 +207,11 @@ pub trait FixedSizeBinaryArrayExt {
/// Some(vec![8, 9].as_slice())
/// ]))
/// ```
fn new(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
fn try_new(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
}

impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
fn new(values: &UInt8Array, stride: i32) -> Result<Self> {
fn try_new(values: &UInt8Array, stride: i32) -> Result<Self> {
let data_type = DataType::FixedSizeBinary(stride);
let data = ArrayDataBuilder::new(data_type)
.len(values.len() / stride as usize)
Expand Down
31 changes: 21 additions & 10 deletions rust/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ impl LogicalType {
self.0 == "list" || self.0 == "list.struct"
}

fn is_large_list(&self) -> bool {
self.0 == "large_list" || self.0 == "large_list.struct"
}

fn is_struct(&self) -> bool {
self.0 == "struct"
}
Expand Down Expand Up @@ -91,6 +95,10 @@ impl TryFrom<&DataType> for LogicalType {
DataType::Struct(_) => "list.struct".to_string(),
_ => "list".to_string(),
},
DataType::LargeList(elem) => match elem.data_type() {
DataType::Struct(_) => "large_list.struct".to_string(),
_ => "large_list".to_string(),
},
DataType::FixedSizeList(dt, len) => format!(
"fixed_size_list:{}:{}",
LogicalType::try_from(dt.data_type())?.0,
Expand All @@ -100,7 +108,7 @@ impl TryFrom<&DataType> for LogicalType {
_ => return Err(Error::Schema(format!("Unsupport data type: {:?}", dt))),
};

Ok(Self(type_str.to_string()))
Ok(Self(type_str))
}
}

Expand Down Expand Up @@ -141,7 +149,7 @@ impl TryFrom<&LogicalType> for DataType {
} {
Ok(t)
} else {
let splits = lt.0.split(":").collect::<Vec<_>>();
let splits = lt.0.split(':').collect::<Vec<_>>();
match splits[0] {
"fixed_size_list" => {
if splits.len() != 3 {
Expand Down Expand Up @@ -173,10 +181,7 @@ impl TryFrom<&LogicalType> for DataType {
} else {
let index_type: DataType = (&LogicalType::from(splits[1])).try_into()?;
let value_type: DataType = (&LogicalType::from(splits[2])).try_into()?;
Ok(DataType::Dictionary(
Box::new(index_type),
Box::new(value_type),
))
Ok(Dictionary(Box::new(index_type), Box::new(value_type)))
}
}
_ => Err(Error::Schema(format!("Unsupported logical type: {}", lt))),
Expand Down Expand Up @@ -236,6 +241,9 @@ impl Field {
pub fn data_type(&self) -> DataType {
match &self.logical_type {
lt if lt.is_list() => DataType::List(Box::new(ArrowField::from(&self.children[0]))),
lt if lt.is_large_list() => {
DataType::LargeList(Box::new(ArrowField::from(&self.children[0])))
}
lt if lt.is_struct() => {
DataType::Struct(self.children.iter().map(ArrowField::from).collect())
}
Expand All @@ -262,8 +270,8 @@ impl Field {
});
}

fn project(&self, path_components: &[&str]) -> Result<Field> {
let mut f = Field {
fn project(&self, path_components: &[&str]) -> Result<Self> {
let mut f = Self {
name: self.name.clone(),
id: self.id,
parent_id: self.parent_id,
Expand Down Expand Up @@ -403,6 +411,7 @@ impl TryFrom<&ArrowField> for Field {
children.iter().map(Self::try_from).collect::<Result<_>>()?
}
DataType::List(item) => vec![Self::try_from(item.as_ref())?],
DataType::LargeList(item) => vec![Self::try_from(item.as_ref())?],
_ => vec![],
};
Ok(Self {
Expand All @@ -414,6 +423,8 @@ impl TryFrom<&ArrowField> for Field {
dt if dt.is_fixed_stride() => Some(Encoding::Plain),
dt if dt.is_binary_like() => Some(Encoding::VarBinary),
DataType::Dictionary(_, _) => Some(Encoding::Dictionary),
// Use plain encoder to store the offsets of list.
DataType::List(_) | DataType::LargeList(_) => Some(Encoding::Plain),
_ => None,
},
extension_name: "".to_string(),
Expand Down Expand Up @@ -477,7 +488,7 @@ impl From<&Field> for pb::Field {
impl From<&Field> for Vec<pb::Field> {
fn from(field: &Field) -> Self {
let mut protos = vec![pb::Field::from(field)];
protos.extend(field.children.iter().flat_map(|c| Self::from(c)));
protos.extend(field.children.iter().flat_map(Self::from));
protos
}
}
Expand Down Expand Up @@ -515,7 +526,7 @@ impl Schema {
}
}

Ok(Schema {
Ok(Self {
fields: candidates,
metadata: self.metadata.clone(),
})
Expand Down
23 changes: 13 additions & 10 deletions rust/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl<'a> PlainDecoder<'a> {
self.length * (*list_size) as usize,
)?;
let item_array = item_decoder.decode().await?;
Ok(Arc::new(FixedSizeListArray::new(item_array, *list_size)?) as ArrayRef)
Ok(Arc::new(FixedSizeListArray::try_new(item_array, *list_size)?) as ArrayRef)
}

async fn decode_fixed_size_binary(&self, stride: &i32) -> Result<ArrayRef> {
Expand All @@ -179,7 +179,7 @@ impl<'a> PlainDecoder<'a> {
.ok_or_else(|| {
Error::Schema("Could not cast to UInt8Array for FixedSizeBinary".to_string())
})?;
Ok(Arc::new(FixedSizeBinaryArray::new(values, *stride)?) as ArrayRef)
Ok(Arc::new(FixedSizeBinaryArray::try_new(values, *stride)?) as ArrayRef)
}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ mod tests {
for t in int_types {
let buffer = Buffer::from_slice_ref(input.as_slice());
let items = make_array_(&t, &buffer).await;
let arr = FixedSizeListArray::new(items, 3).unwrap();
let arr = FixedSizeListArray::try_new(items, 3).unwrap();
let list_type = DataType::FixedSizeList(Box::new(Field::new("item", t, true)), 3);
test_round_trip(Arc::new(arr) as ArrayRef, list_type).await;
}
Expand All @@ -308,13 +308,13 @@ mod tests {
for t in float_types {
let buffer = Buffer::from_slice_ref(input.as_slice());
let items = make_array_(&t, &buffer).await;
let arr = FixedSizeListArray::new(items, 3).unwrap();
let arr = FixedSizeListArray::try_new(items, 3).unwrap();
let list_type = DataType::FixedSizeList(Box::new(Field::new("item", t, true)), 3);
test_round_trip(Arc::new(arr) as ArrayRef, list_type).await;
}

let items = BooleanArray::from(vec![true, false, true].repeat(42));
let arr = FixedSizeListArray::new(items, 3).unwrap();
let arr = FixedSizeListArray::try_new(items, 3).unwrap();
let list_type =
DataType::FixedSizeList(Box::new(Field::new("item", DataType::Boolean, true)), 3);
test_round_trip(Arc::new(arr) as ArrayRef, list_type).await;
Expand All @@ -324,7 +324,7 @@ mod tests {
async fn test_encode_decode_fixed_size_binary_array() {
let t = DataType::FixedSizeBinary(3);
let values = UInt8Array::from(Vec::from_iter(1..127 as u8));
let arr = FixedSizeBinaryArray::new(&values, 3).unwrap();
let arr = FixedSizeBinaryArray::try_new(&values, 3).unwrap();
test_round_trip(Arc::new(arr) as ArrayRef, t).await;
}

Expand All @@ -334,16 +334,19 @@ mod tests {
let inner = DataType::FixedSizeList(Box::new(Field::new("item", DataType::Int64, true)), 2);
let t = DataType::FixedSizeList(Box::new(Field::new("item", inner, true)), 2);
let values = Int64Array::from_iter_values(1..=120 as i64);
let arr = FixedSizeListArray::new(FixedSizeListArray::new(values, 2).unwrap(), 2).unwrap();
let arr = FixedSizeListArray::try_new(FixedSizeListArray::try_new(values, 2).unwrap(), 2)
.unwrap();
test_round_trip(Arc::new(arr) as ArrayRef, t).await;

// FixedSizeList of FixedSizeBinary
let inner = DataType::FixedSizeBinary(2);
let t = DataType::FixedSizeList(Box::new(Field::new("item", inner, true)), 2);
let values = UInt8Array::from_iter_values(1..=120 as u8);
let arr =
FixedSizeListArray::new(FixedSizeBinaryArray::new(&values, 2).unwrap().borrow(), 2)
.unwrap();
let arr = FixedSizeListArray::try_new(
FixedSizeBinaryArray::try_new(&values, 2).unwrap().borrow(),
2,
)
.unwrap();
test_round_trip(Arc::new(arr) as ArrayRef, t).await;
}

Expand Down
22 changes: 20 additions & 2 deletions rust/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use arrow_arith::arithmetic::subtract_scalar;
use arrow_array::cast::as_primitive_array;
use arrow_array::{ArrayRef, ListArray, RecordBatch, StructArray};
use arrow_array::{ArrayRef, Int64Array, LargeListArray, ListArray, RecordBatch, StructArray};
use arrow_schema::DataType;
use async_recursion::async_recursion;
use byteorder::{ByteOrder, LittleEndian};
Expand Down Expand Up @@ -224,6 +224,7 @@ impl<'a> FileReader<'a> {

async fn read_list_array(&self, field: &Field, batch_id: i32) -> Result<ArrayRef> {
let page_info = self.page_info(field, batch_id)?;

let position_arr = self
.object_reader
.read_fixed_stride_array(&DataType::Int32, page_info.position, page_info.length)
Expand All @@ -234,7 +235,23 @@ impl<'a> FileReader<'a> {
let offset_arr = subtract_scalar(positions, start_position)?;
let value_arrs = self.read_array(&field.children[0], batch_id).await?;

Ok(Arc::new(ListArray::new(value_arrs, &offset_arr)?))
Ok(Arc::new(ListArray::try_new(value_arrs, &offset_arr)?))
}

// TODO: merge with [read_list_array]?
async fn read_large_list_array(&self, field: &Field, batch_id: i32) -> Result<ArrayRef> {
let page_info = self.page_info(field, batch_id)?;
let position_arr = self
.object_reader
.read_fixed_stride_array(&DataType::Int64, page_info.position, page_info.length)
.await?;
let positions: &Int64Array = as_primitive_array(position_arr.as_ref());
let start_position = positions.value(0);
// Compute offsets
let offset_arr = subtract_scalar(positions, start_position)?;
let value_arrs = self.read_array(&field.children[0], batch_id).await?;

Ok(Arc::new(LargeListArray::try_new(value_arrs, &offset_arr)?))
}

/// Read an array of the batch.
Expand All @@ -254,6 +271,7 @@ impl<'a> FileReader<'a> {
Struct(_) => self.read_struct_array(field, batch_id).await,
Dictionary(_, _) => self.read_dictionary_array(field, batch_id).await,
List(_) => self.read_list_array(field, batch_id).await,
LargeList(_) => self.read_large_list_array(field, batch_id).await,
_ => {
unimplemented!("{}", format!("No support for {data_type} yet"));
}
Expand Down
Loading

0 comments on commit 25cf897

Please sign in to comment.