Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not concatenate identical dictionaries #1219

Merged
merged 2 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions arrow/src/array/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,41 @@ impl ArrayData {
Ok(())
})
}

/// Returns true if this `ArrayData` is equal to `other`, using pointer comparisons
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// to determine buffer equality. This is cheaper than `PartialEq::eq` but may
/// return false negatives
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what case would this return a false negative (to the "are these two pointers the same" question)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may say two things are not equal when logically they are?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the wording

pub fn ptr_eq(&self, other: &Self) -> bool {
if self.offset != other.offset
|| self.len != other.len
|| self.null_count != other.null_count
|| self.data_type != other.data_type
|| self.buffers.len() != other.buffers.len()
|| self.child_data.len() != other.child_data.len()
{
return false;
}

match (&self.null_bitmap, &other.null_bitmap) {
(Some(a), Some(b)) if a.bits.as_ptr() != b.bits.as_ptr() => return false,
(Some(_), None) | (None, Some(_)) => return false,
_ => {}
};

if !self
.buffers
.iter()
.zip(other.buffers.iter())
.all(|(a, b)| a.as_ptr() == b.as_ptr())
{
return false;
}

self.child_data
.iter()
.zip(other.child_data.iter())
.all(|(a, b)| a.ptr_eq(b))
}
}

/// Return the expected [`DataTypeLayout`] Arrays of this data
Expand Down Expand Up @@ -1547,12 +1582,51 @@ mod tests {
.add_buffer(make_i32_buffer(1))
.build()
.unwrap();

let float_data = ArrayData::builder(DataType::Float32)
.len(1)
.add_buffer(make_f32_buffer(1))
.build()
.unwrap();
assert_ne!(int_data, float_data);
assert!(!int_data.ptr_eq(&float_data));
assert!(int_data.ptr_eq(&int_data));

let int_data_clone = int_data.clone();
assert_eq!(int_data, int_data_clone);
assert!(int_data.ptr_eq(&int_data_clone));
assert!(int_data_clone.ptr_eq(&int_data));

let int_data_slice = int_data_clone.slice(1, 0);
assert!(int_data_slice.ptr_eq(&int_data_slice));
assert!(!int_data.ptr_eq(&int_data_slice));
assert!(!int_data_slice.ptr_eq(&int_data));

let data_buffer = Buffer::from_slice_ref(&"abcdef".as_bytes());
let offsets_buffer = Buffer::from_slice_ref(&[0_i32, 2_i32, 2_i32, 5_i32]);
let string_data = ArrayData::try_new(
DataType::Utf8,
3,
Some(1),
Some(Buffer::from_iter(vec![true, false, true])),
0,
vec![offsets_buffer, data_buffer],
vec![],
)
.unwrap();

assert_ne!(float_data, string_data);
assert!(!float_data.ptr_eq(&string_data));

assert!(string_data.ptr_eq(&string_data));

let string_data_cloned = string_data.clone();
assert!(string_data_cloned.ptr_eq(&string_data));
assert!(string_data.ptr_eq(&string_data_cloned));

let string_data_slice = string_data.slice(1, 2);
assert!(string_data_slice.ptr_eq(&string_data_slice));
assert!(!string_data_slice.ptr_eq(&string_data))
}

#[test]
Expand Down
67 changes: 39 additions & 28 deletions arrow/src/array/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,34 +520,40 @@ impl<'a> MutableArrayData<'a> {
}
};

let dictionary = match &data_type {
DataType::Dictionary(_, _) => match arrays.len() {
0 => unreachable!(),
1 => Some(arrays[0].child_data()[0].clone()),
_ => {
if let Capacities::Dictionary(_, _) = capacities {
panic!("dictionary capacity not yet supported")
}
// Concat dictionaries together
let dictionaries: Vec<_> =
arrays.iter().map(|array| &array.child_data()[0]).collect();
let lengths: Vec<_> = dictionaries
.iter()
.map(|dictionary| dictionary.len())
.collect();
let capacity = lengths.iter().sum();

let mut mutable =
MutableArrayData::new(dictionaries, false, capacity);

for (i, len) in lengths.iter().enumerate() {
mutable.extend(i, 0, *len)
}
// Get the dictionary if any, and if it is a concatenation of multiple
let (dictionary, dict_concat) = match &data_type {
DataType::Dictionary(_, _) => {
// If more than one dictionary, concatenate dictionaries together
let dict_concat = !arrays
.windows(2)
.all(|a| a[0].child_data()[0].ptr_eq(&a[1].child_data()[0]));

match dict_concat {
false => (Some(arrays[0].child_data()[0].clone()), false),
true => {
if let Capacities::Dictionary(_, _) = capacities {
panic!("dictionary capacity not yet supported")
}
let dictionaries: Vec<_> =
arrays.iter().map(|array| &array.child_data()[0]).collect();
let lengths: Vec<_> = dictionaries
.iter()
.map(|dictionary| dictionary.len())
.collect();
let capacity = lengths.iter().sum();

let mut mutable =
MutableArrayData::new(dictionaries, false, capacity);

for (i, len) in lengths.iter().enumerate() {
mutable.extend(i, 0, *len)
}

Some(mutable.freeze())
(Some(mutable.freeze()), true)
}
}
},
_ => None,
}
_ => (None, false),
};

let extend_nulls = build_extend_nulls(data_type);
Expand All @@ -572,8 +578,13 @@ impl<'a> MutableArrayData<'a> {
.iter()
.map(|array| {
let offset = next_offset;
next_offset += array.child_data()[0].len();
build_extend_dictionary(array, offset, next_offset)
let dict_len = array.child_data()[0].len();

if dict_concat {
next_offset += dict_len;
}

build_extend_dictionary(array, offset, offset + dict_len)
.ok_or(ArrowError::DictionaryKeyOverflowError)
})
.collect();
Expand Down
40 changes: 40 additions & 0 deletions arrow/src/compute/kernels/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,4 +525,44 @@ mod tests {

Ok(())
}

#[test]
fn test_dictionary_concat_reuse() {
let array: DictionaryArray<Int8Type> =
vec!["a", "a", "b", "c"].into_iter().collect();
let array_copy: DictionaryArray<Int8Type> = array.data().clone().into();

// dictionary is "a", "b", "c"
assert_eq!(
array.values(),
&(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef)
);
assert_eq!(array.keys(), &Int8Array::from(vec![0, 0, 1, 2]));

// concatenate it with itself
let combined = concat(&[&array_copy as _, &array as _]).unwrap();

let combined = combined
.as_any()
.downcast_ref::<DictionaryArray<Int8Type>>()
.unwrap();

assert_eq!(
combined.values(),
&(Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef),
"Actual: {:#?}",
combined
);

assert_eq!(
combined.keys(),
&Int8Array::from(vec![0, 0, 1, 2, 0, 0, 1, 2])
);

// Should have reused the dictionary
assert!(array.data().child_data()[0].ptr_eq(&combined.data().child_data()[0]));
assert!(
array_copy.data().child_data()[0].ptr_eq(&combined.data().child_data()[0])
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test of concatenating three dictionaries -- where 2 use the same dictionary and one is a different dictionary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}