Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix for BinaryEncoder positions #698

Merged
merged 2 commits into from
Mar 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions rust/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
use std::sync::Arc;

use arrow_arith::arithmetic::subtract_scalar;
use arrow_array::builder::{ArrayBuilder, PrimitiveBuilder};
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::{
cast::as_primitive_array,
new_empty_array,
Expand Down Expand Up @@ -61,7 +61,7 @@ impl<'a> BinaryEncoder<'a> {
PrimitiveBuilder::with_capacity(capacity);

let mut value_offset: usize = self.writer.tell();
for array in arrs {
for (a_idx, array) in arrs.iter().enumerate() {
let arr = array
.as_any()
.downcast_ref::<GenericByteArray<T>>()
Expand All @@ -81,11 +81,19 @@ impl<'a> BinaryEncoder<'a> {

let start_offset = offsets[0];
// Did not use `add_scalar(positions, value_offset)`, so we can save a memory copy.
offsets.iter().for_each(|o| {
offsets.iter().enumerate().for_each(|(o_idx, o)| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could it be simpler like

let pos_builder = Builder::new(...)
pos_builder.append(start_pos);

for array in arrs.iter() {
    let offsets = array.offsets_array();
    let last_offset = pos_builder.last() # somehow
    pos_builder.append_iter(offsets.iter().skip(1).map(|p| p + last))
}

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no last but maybe pos_builder.values_slice()[pos_builder.len() - 1] would work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to this issue: #700

let new_offset = ((*o - start_offset).as_usize() + value_offset) as i64;
pos_builder.append_value(new_offset);

// Each GenericByteArray of len X has a corresponding offset array of len X + 1
// Since we are merging multiple arrays together, we only need to copy the "extra"
// element of the last array
if (offsets.len() != o_idx + 1) || (arrs.len() == a_idx + 1) {
pos_builder.append_value(new_offset);
} else {
// The next offset array should start where this one ends
value_offset = new_offset as usize;
}
});
value_offset = (pos_builder.values_slice()[pos_builder.len() - 1] + 1) as usize;
}

for buf in buffers {
Expand Down Expand Up @@ -447,7 +455,12 @@ mod tests {
async fn test_write_binary_data() {
test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await;
test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await;

test_round_trips(&[
&StringArray::from(vec![Some("a"), None, Some("cd"), None]),
&StringArray::from(vec![Some("f"), None, Some("gh"), None]),
&StringArray::from(vec![Some("t"), None, Some("uv"), None]),
])
.await;
test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await;
test_round_trips(&[&LargeStringArray::from(vec![
Some("a"),
Expand Down
52 changes: 38 additions & 14 deletions rust/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,25 +796,34 @@ mod tests {
let store = ObjectStore::memory();
let path = Path::from("/null_strings");

let (arrow_schema, struct_array) = make_struct_of_list_array(10, 10);
let arrow_schema = make_schema_of_list_array();
let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap();

let batches = (0..3)
.map(|_| {
let struct_array = make_struct_of_list_array(10, 10);
RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
})
.collect::<Vec<_>>();
let batches_ref = batches.iter().collect::<Vec<_>>();

let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.write(batches_ref.as_slice()).await.unwrap();
file_writer.finish().await.unwrap();

let reader = FileReader::try_new(&store, &path).await.unwrap();
let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
assert_eq!(batch, actual_batch);
let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
assert_eq!(expected, actual_batch);
}

#[tokio::test]
async fn test_scan_struct_of_list_arrays() {
let store = ObjectStore::memory();
let path = Path::from("/null_strings");

let (arrow_schema, struct_array) = make_struct_of_list_array(3, 10);
let arrow_schema = make_schema_of_list_array();
let struct_array = make_struct_of_list_array(3, 10);
let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();

Expand Down Expand Up @@ -848,11 +857,8 @@ mod tests {
assert_eq!(expected_batch, slice_of_batch);
}

fn make_struct_of_list_array(
rows: i32,
num_items: i32,
) -> (Arc<arrow_schema::Schema>, Arc<StructArray>) {
let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Struct(vec![
ArrowField::new(
Expand All @@ -865,23 +871,34 @@ mod tests {
DataType::List(Box::new(ArrowField::new("item", DataType::Utf8, true))),
true,
),
ArrowField::new(
"ll",
DataType::LargeList(Box::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
]),
true,
)]));
)]))
}

fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
let mut li_builder = ListBuilder::new(Int32Builder::new());
let mut ls_builder = ListBuilder::new(StringBuilder::new());
let ll_value_builder = Int32Builder::new();
let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
for i in 0..rows {
for j in 0..num_items {
li_builder.values().append_value(i * 10 + j);
ls_builder
.values()
.append_value(format!("str-{}", i * 10 + j));
large_list_builder.values().append_value(i * 10 + j);
}
li_builder.append(true);
ls_builder.append(true);
large_list_builder.append(true);
}
let struct_array = Arc::new(StructArray::from(vec![
Arc::new(StructArray::from(vec![
(
ArrowField::new(
"li",
Expand All @@ -898,8 +915,15 @@ mod tests {
),
Arc::new(ls_builder.finish()) as ArrayRef,
),
]));
(arrow_schema, struct_array)
(
ArrowField::new(
"ll",
DataType::LargeList(Box::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
Arc::new(large_list_builder.finish()) as ArrayRef,
),
]))
}

#[tokio::test]
Expand Down
66 changes: 43 additions & 23 deletions rust/src/io/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::sync::Arc;

use arrow_arith::arithmetic::subtract_scalar;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::cast::{as_large_list_array, as_list_array, as_struct_array};
use arrow_array::{Array, ArrayRef, Int32Array, Int64Array, RecordBatch, StructArray};
use arrow_array::types::{Int32Type, Int64Type};
use arrow_array::{Array, ArrayRef, RecordBatch, StructArray};
use arrow_schema::DataType;
use async_recursion::async_recursion;
use object_store::path::Path;
Expand Down Expand Up @@ -271,43 +271,63 @@ impl<'a> FileWriter<'a> {
}

async fn write_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> {
let capacity: usize = arrs.iter().map(|a| a.len()).sum();
let mut list_arrs: Vec<&ArrayRef> = Vec::new();
let mut offsets: Vec<ArrayRef> = Vec::new();
let mut pos_builder: PrimitiveBuilder<Int32Type> =
PrimitiveBuilder::with_capacity(capacity);

for array in arrs {
let mut initial_offset: usize = 0;
for (a_idx, array) in arrs.iter().enumerate() {
let list_arr = as_list_array(*array);
let value_offsets: Int32Array = list_arr.value_offsets().iter().copied().collect();
assert!(!value_offsets.is_empty());
let offsets_arr =
Arc::new(subtract_scalar(&value_offsets, value_offsets.value(0))?) as ArrayRef;
list_arrs.push(list_arr.values());
offsets.push(offsets_arr);

let value_offsets = list_arr.value_offsets();
assert!(!value_offsets.is_empty());
let start_offset = value_offsets[0];
value_offsets.iter().enumerate().for_each(|(o_idx, o)| {
let new_offset = ((*o - start_offset) + initial_offset as i32) as i32;

if (value_offsets.len() != o_idx + 1) || (arrs.len() == a_idx + 1) {
pos_builder.append_value(new_offset);
} else {
initial_offset = new_offset as usize;
}
});
}

let offsets_ref = offsets.iter().map(|l| l.deref()).collect::<Vec<_>>();
self.write_fixed_stride_array(field, offsets_ref.as_slice())
.await?;
let positions: &dyn Array = &pos_builder.finish();
self.write_fixed_stride_array(field, &[positions]).await?;
self.write_array(&field.children[0], list_arrs.as_slice())
.await
}

async fn write_large_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> {
let capacity: usize = arrs.iter().map(|a| a.len()).sum();
let mut list_arrs: Vec<&ArrayRef> = Vec::new();
let mut offsets: Vec<ArrayRef> = Vec::new();
let mut pos_builder: PrimitiveBuilder<Int64Type> =
PrimitiveBuilder::with_capacity(capacity);

for array in arrs {
let mut initial_offset: usize = 0;
for (a_idx, array) in arrs.iter().enumerate() {
let list_arr = as_large_list_array(*array);
let value_offsets: Int64Array = list_arr.value_offsets().iter().copied().collect();
assert!(!value_offsets.is_empty());
let offsets_arr =
Arc::new(subtract_scalar(&value_offsets, value_offsets.value(0))?) as ArrayRef;
list_arrs.push(list_arr.values());
offsets.push(offsets_arr);

let value_offsets = list_arr.value_offsets();
assert!(!value_offsets.is_empty());
let start_offset = value_offsets[0];
value_offsets.iter().enumerate().for_each(|(o_idx, o)| {
let new_offset = (*o - start_offset) + initial_offset as i64;

if (value_offsets.len() != o_idx + 1) || (arrs.len() == a_idx + 1) {
pos_builder.append_value(new_offset);
} else {
initial_offset = new_offset as usize;
}
});
}

let offsets_ref = offsets.iter().map(|l| l.deref()).collect::<Vec<_>>();
self.write_fixed_stride_array(field, offsets_ref.as_slice())
.await?;
let positions: &dyn Array = &pos_builder.finish();
self.write_fixed_stride_array(field, &[positions]).await?;
self.write_array(&field.children[0], list_arrs.as_slice())
.await
}
Expand Down