-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Row accumulator support update Scalar values #6003
Row accumulator support update Scalar values #6003
Conversation
Thanks @mingmwang for the recent improvement patches. The benchmark performance is improved around 30% for TPCH SF10.
|
For Q5, there is downgrade between the versions. But I think it is unrelated to my PRs, the aggregation is very lightweight.
|
Q5 Before this PR Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(5), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data10", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: true, enable_scheduler: false } After this PR Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(5), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data10", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: true, enable_scheduler: false } Almost no difference. |
|
||
/// This method is similar to Scalar::try_from_array except for the Null handling. | ||
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)] | ||
fn col_to_scalar( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can replace this function with
/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
fn col_to_scalar(
array: &ArrayRef,
filter: &Option<&BooleanArray>,
row_index: usize,
) -> Result<ScalarValue> {
if array.is_null(row_index) {
return Ok(ScalarValue::Null);
}
if let Some(filter) = filter {
if !filter.value(row_index) {
return Ok(ScalarValue::Null);
}
}
let mut res = ScalarValue::try_from_array(array, row_index)?;
if res.is_null() {
res = ScalarValue::Null;
}
Ok(res)
}
ScalarValue::is_null
matches both [ScalarValue::Null]
and [ScalarValue::Type(None)]
.
Thanks, cool PR. Linking #5969 as potentially related |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your ongoing efforts in improving the performance of aggregate functions. Really appreciate it!
If I understand the core concept of this PR correctly: the current aggregate process, when dealing with high-cardinality aggregations, suffers from low efficiency due to the frequent generation of small arrays from slicing. To address this, you've changed the approach for high-cardinality aggregations by generating ScalarValue
or Vec<ScalarValue>
instead of creating arrays through slicing.
Nonetheless, generating ScalarValue
s could still lead to some overhead. As an alternative, I propose the following: we could add a set of methods to arrow-arith
's aggregate
functions, allowing them to accept a selection vector (also suggested by @alamb previously in #5944 (comment)). This way, we can avoid creating slices and generating ScalarValue
at the same time.
if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single) | ||
&& normal_aggr_input_values.is_empty() | ||
&& normal_filter_values.is_empty() | ||
&& groups_with_rows.len() >= batch.num_rows() / 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This magic number 10
is used to identify high cardinality. Shall we make it configurable or document how this 10
is chosen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will make it configurable.
Filed apache/arrow-rs#4095 . |
Especially for high-cardinality aggregations, the query performance will be downgraded due to lots of slicing invoking |
Yes, you are right. I agree that generating |
I will try the following items on the basis of the PR:
|
Agree. We could always revisit it when apache/arrow-rs#4095 get resolved or find another sound approach. Thanks again! |
I had a comment for refactoring |
Yes, I will change accordingly per your comments. |
@yjshen @mustafasrepo @Dandandan And I think after this PR, the major bottleneck of the high cardinality aggregations on DataFusion should be the Cache miss and Memory stall and our CPU instructions per cycle is much lower than DuckDB's (on tpch-q17). |
@mustafasrepo |
let filter_bool_array = row_filter_values | ||
.iter() | ||
.map(|filter_opt| match filter_opt { | ||
Some(f) => Ok(Some(as_boolean_array(f)?)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you don't need to wrap the result in Ok
/ Result
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the as_boolean_array
returns a Result here.
"GroupedHashAggregate: can't create a scalar from array of type \"{other:?}\"" | ||
))), | ||
let mut res = ScalarValue::try_from_array(array, row_index)?; | ||
if res.is_null() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is_null
check seems redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agree. Just remove it.
Hi @Dandandan, do you still have any concerns for this PR? |
I'll merge this PR to unblock the further optimizations. If there's any further concerns, we can create a following issue. |
Which issue does this PR close?
Closes #6002 .
Rationale for this change
improve the Aggregator performance when group by high cardinality columns.
What changes are included in this PR?
update_scalar()
method toRowAccumulator
traitAre these changes tested?
I had test this on my local Mac.
For TPCH-q17, there is at about 40% improvement.
Before this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 17 iteration 0 took 1888.9 ms and returned 1 rows
Query 17 iteration 1 took 1851.8 ms and returned 1 rows
Query 17 iteration 2 took 1853.1 ms and returned 1 rows
Query 17 avg time: 1864.61 ms
After this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: true, enable_scheduler: false }
Query 17 iteration 0 took 1377.0 ms and returned 1 rows
Query 17 iteration 1 took 1324.3 ms and returned 1 rows
Query 17 iteration 2 took 1317.7 ms and returned 1 rows
Query 17 avg time: 1339.68 ms.
Are there any user-facing changes?