-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize row hash #6065
Optimize row hash #6065
Conversation
Performance improvement with optimized 2.4 GHz 8-Core Intel Core i9 Query |
.map(|array| array.slice(offsets[0], offsets[1] - offsets[0])) | ||
.collect(); | ||
let null_array = Arc::new(NullArray::new(0)) as ArrayRef; | ||
let mut sliced_arrays: Vec<ArrayRef> = vec![null_array; aggr_array.len()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why this should be faster? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key point of this PR is to get rid of extra allocations #5969 (comment) and allows 20-25% speed gain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another optimization that we traverse input collection only once, instead of 2 times in original implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #5969 (comment) -- reported performance improvement
I will do some test locally tomorrow. |
|
if let Some(f) = filter_opt { | ||
let sliced = f.slice(offsets[0], offsets[1] - offsets[0]); | ||
let filter_array = as_boolean_array(&sliced)?; | ||
|
||
sliced_arrays | ||
.iter() | ||
.map(|array| filter(array, filter_array).unwrap()) | ||
.collect::<Vec<ArrayRef>>() | ||
for (i, arr) in aggr_array.iter().enumerate() { | ||
let sliced = &arr.slice(offsets[0], offsets[1] - offsets[0]); | ||
sliced_arrays[i] = filter(sliced, filter_array).unwrap(); | ||
} | ||
} else { | ||
for (i, arr) in aggr_array.iter().enumerate() { | ||
sliced_arrays[i] = arr.slice(offsets[0], offsets[1] - offsets[0]); | ||
} | ||
None => sliced_arrays, | ||
}; | ||
Ok(filtered_arrays) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think writing these loops as a zip of aggr_array.iter()
and sliced_arrays.iter_mut()
and avoiding the sliced_arrays[i]
access inside the loop can make the code (1) a little more idiomatic, and (2) may result in less implicit bounds-checking at run-time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With zip
might even use collect
into Vec
again to avoid initializing the vec
.
@comphead Before this PR: q17(sf =1) q17(sf =10) this PR: q17(sf =10) How this was tested: if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
&& normal_aggr_input_values.is_empty()
&& normal_filter_values.is_empty()
&& groups_with_rows.len() >= batch.num_rows() / 10 change the magic number from if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
&& normal_aggr_input_values.is_empty()
&& normal_filter_values.is_empty()
&& groups_with_rows.len() >= batch.num_rows() / 1 So that the update accumulators will use the method |
cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./parquet_data10 --format parquet --query 17 -n 1 --disable-statistics cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./parquet_data --format parquet --query 17 -n 1 --disable-statistics My Mac: |
.collect::<Vec<ArrayRef>>() | ||
for (i, arr) in aggr_array.iter().enumerate() { | ||
let sliced = &arr.slice(offsets[0], offsets[1] - offsets[0]); | ||
sliced_arrays[i] = filter(sliced, filter_array).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I don't think it's a good idea to do more than one thing in a loop, especially the slice
of Array is not so light weight behavior. And I would prefer the previous implementation.
From the Flame Graph, it shows the hot path of the method impl<T, I> SpecFromIterNested<T, I> for Vec<T>
where
I: TrustedLen<Item = T>,
{
fn from_iter(iterator: I) -> Self {
let mut vector = match iterator.size_hint() {
(_, Some(upper)) => Vec::with_capacity(upper),
// TrustedLen contract guarantees that `size_hint() == (_, None)` means that there
// are more than `usize::MAX` elements.
// Since the previous branch would eagerly panic if the capacity is too large
// (via `with_capacity`) we do the same here.
_ => panic!("capacity overflow"),
};
// reuse extend specialization for TrustedLen
vector.spec_extend(iterator);
vector
}
} |
https://doc.rust-lang.org/std/iter/trait.TrustedLen.html#impl-TrustedLen-for-Iter%3C'_,+T%3E-1
|
Thanks folks for the feedback @mingmwang this change was tested before you PR #6003 merged and only for q32. I will retest the latest codebase soon with other benchmarks. @ozankabak mutating by dereferencing sounds good to me, I will test it out @yahoNanJing I'm not getting your part, are you saying 2 iterations each doing the operation is the faster than 1 iteration doing 2 ops every iteration? I will retest it soon, and share results. If there is still no perf benefit after #6003 I will close the PR. |
Can someone else help to test and verify this on other machines? |
I will run test this PR using https://github.com/alamb/datafusion-benchmarking/blob/main/bench.sh |
Here are some benchmark results:
For reference, here is the same benchmark run against
|
I have also double checked from my side with latest main which also includes row_hash optimizations from @mingmwang datafusion-cli built in release mode With this PR
Without PR
Before #6003 DF took 62 and 58 sec respectively. |
Thanks @comphead and @mingmwang and @ozankabak and @Dandandan |
Which issue does this PR close?
Closes #6064.
Rationale for this change
slice_and_maybe_filter spends some CPU ticks on vector allocations and can be improved
What changes are included in this PR?
Rewrite slice_and_maybe_filter to avoid excessive vector allocations
Are these changes tested?
Yes
Are there any user-facing changes?
No