Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row AVG accumulator support Decimal type #5973

Merged
merged 3 commits into from
Apr 13, 2023

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Apr 12, 2023

Which issue does this PR close?

Closes #5892.

Rationale for this change

Improve the Aggregate performance for decimal type

What changes are included in this PR?

Are these changes tested?

I had test this on my local Mac.
For TPCH-q17, there is at about 25% improvement.

Before this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 17 iteration 0 took 2623.2 ms and returned 1 rows
Query 17 iteration 1 took 2580.4 ms and returned 1 rows
Query 17 iteration 2 took 2575.7 ms and returned 1 rows
Query 17 avg time: 2593.11 ms

After this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 17 iteration 0 took 1920.4 ms and returned 1 rows
Query 17 iteration 1 took 1877.0 ms and returned 1 rows
Query 17 iteration 2 took 1878.1 ms and returned 1 rows
Query 17 avg time: 1891.81 ms

For TPCH-q18, it seems there is little improvement.

Before this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(18), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 18 iteration 0 took 1359.0 ms and returned 57 rows
Query 18 iteration 1 took 1285.2 ms and returned 57 rows
Query 18 iteration 2 took 1278.0 ms and returned 57 rows
Query 18 avg time: 1307.40 ms

After this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(18), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 18 iteration 0 took 1335.8 ms and returned 57 rows
Query 18 iteration 1 took 1257.7 ms and returned 57 rows
Query 18 iteration 2 took 1237.9 ms and returned 57 rows
Query 18 avg time: 1277.12 ms

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) labels Apr 12, 2023
@mingmwang mingmwang requested review from yjshen and alamb April 12, 2023 09:39
@mingmwang
Copy link
Contributor Author

@alamb alamb changed the title Row accumulator support Decimal type Row AVG accumulator support Decimal type Apr 12, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- @yjshen could you please review the changes to the row format?

DataType::Decimal128(p, s) => {
match accessor.get_u64_opt(self.state_index()) {
None => Ok(ScalarValue::Decimal128(None, p, s)),
Some(0) => Ok(ScalarValue::Decimal128(None, p, s)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we translate 0 --> null here? (it is also done for Float64 below)?

I see you are just following the existing pattern, but it seems like this could be incorrect?

Maybe we could add a test that calls AVG on (-1 and 1) to see if we get 0 or NULL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do some test on this today.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb, the value at the state_index is for the count rather than the sum. When the count is 0, for the average, it should be NULL.

@mingmwang
Copy link
Contributor Author

For query 18, I think the plan is problematic, it is the Join order and build side selection, the bottleneck is not the Aggregations.

=== Physical plan with metrics ===
SortExec: expr=[o_totalprice@4 DESC,o_orderdate@3 ASC NULLS LAST], metrics=[output_rows=57, elapsed_compute=11.461µs, spill_count=0, spilled_bytes=0]
  AggregateExec: mode=Single, gby=[c_name@1 as c_name, c_custkey@0 as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@4 as o_orderdate, o_totalprice@3 as o_totalprice], aggr=[SUM(lineitem.l_quantity)], metrics=[output_rows=57, elapsed_compute=49.165µs, spill_count=0, spilled_bytes=0, mem_used=0]
    CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=399, elapsed_compute=2.681µs, spill_count=0, spilled_bytes=0, mem_used=0]
      HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })], metrics=[output_rows=456, input_rows=456, input_batches=2, build_input_batches=733, build_input_rows=6001215, output_batches=2, build_mem_used=806510152, build_time=676.460128ms, join_time=2.88546ms]
        ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, o_orderkey@2 as o_orderkey, o_totalprice@3 as o_totalprice, o_orderdate@4 as o_orderdate, l_quantity@6 as l_quantity], metrics=[output_rows=6001215, elapsed_compute=116.294µs, spill_count=0, spilled_bytes=0, mem_used=0]
          CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=6001215, elapsed_compute=50.883µs, spill_count=0, spilled_bytes=0, mem_used=0]
            HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })], metrics=[output_rows=6001215, input_rows=6001215, input_batches=733, build_input_batches=184, build_input_rows=1500000, output_batches=733, build_mem_used=177373616, build_time=154.423989ms, join_time=268.907576ms]
              ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, o_orderkey@2 as o_orderkey, o_totalprice@4 as o_totalprice, o_orderdate@5 as o_orderdate], metrics=[output_rows=1500000, elapsed_compute=34.671µs, spill_count=0, spilled_bytes=0, mem_used=0]
                CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1500000, elapsed_compute=14.97µs, spill_count=0, spilled_bytes=0, mem_used=0]
                  HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })], metrics=[output_rows=1500000, input_rows=1500000, input_batches=184, build_input_batches=19, build_input_rows=150000, output_batches=184, build_mem_used=14652720, build_time=7.173373ms, join_time=64.614167ms]
                    ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data/customer/part-0.parquet]]}, projection=[c_custkey, c_name], metrics=[output_rows=150000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, row_groups_pruned=0, bytes_scanned=566600, page_index_rows_filtered=0, pushdown_eval_time=2ns, time_elapsed_scanning_total=5.384791ms, time_elapsed_processing=5.024293ms, time_elapsed_scanning_until_data=2.83875ms, time_elapsed_opening=787.167µs, page_index_eval_time=2ns]
                    ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data/orders/part-0.parquet]]}, projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate], metrics=[output_rows=1500000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, row_groups_pruned=0, bytes_scanned=13916402, page_index_rows_filtered=0, pushdown_eval_time=2ns, time_elapsed_scanning_total=112.261704ms, time_elapsed_processing=45.363041ms, time_elapsed_scanning_until_data=6.063625ms, time_elapsed_opening=549.5µs, page_index_eval_time=2ns]
              ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data/lineitem/part-0.parquet]]}, projection=[l_orderkey, l_quantity], metrics=[output_rows=6001215, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, row_groups_pruned=0, bytes_scanned=12170874, page_index_rows_filtered=0, pushdown_eval_time=2ns, time_elapsed_scanning_total=324.056333ms, time_elapsed_processing=50.1033ms, time_elapsed_scanning_until_data=2.464666ms, time_elapsed_opening=2.0565ms, page_index_eval_time=2ns]
        ProjectionExec: expr=[l_orderkey@0 as l_orderkey], metrics=[output_rows=57, elapsed_compute=458ns, spill_count=0, spilled_bytes=0, mem_used=0]
          CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=57, elapsed_compute=19.342µs, spill_count=0, spilled_bytes=0, mem_used=0]
            FilterExec: SUM(lineitem.l_quantity)@1 > Some(30000),25,2, metrics=[output_rows=57, elapsed_compute=1.093341ms, spill_count=0, spilled_bytes=0, mem_used=0]
              AggregateExec: mode=Single, gby=[l_orderkey@0 as l_orderkey], aggr=[SUM(lineitem.l_quantity)], metrics=[output_rows=1500000, elapsed_compute=434.341183ms, spill_count=0, spilled_bytes=0, mem_used=0]
                ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data/lineitem/part-0.parquet]]}, projection=[l_orderkey, l_quantity], metrics=[output_rows=6001215, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, row_groups_pruned=0, bytes_scanned=12170874, page_index_rows_filtered=0, pushdown_eval_time=2ns, time_elapsed_scanning_total=443.876134ms, time_elapsed_processing=49.172503ms, time_elapsed_scanning_until_data=2.226959ms, time_elapsed_opening=844.959µs, page_index_eval_time=2ns]

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mingmwang @yahoNanJing! Looks great to me.

Could we also add two roundtrip (DecimalArray -> vec<u8> -> DecimalArray) tests in datafusion/row/src/lib.rs for the newly introduced Decimal type? One for a null-free case and one for a nullable case.

@mingmwang
Copy link
Contributor Author

Thanks @mingmwang @yahoNanJing! Looks great to me.

Could we also add two roundtrip (DecimalArray -> vec<u8> -> DecimalArray) tests in datafusion/row/src/lib.rs for the newly introduced Decimal type? One for a null-free case and one for a nullable case.

Sure, I will add the two tests in a following PR.

@mingmwang mingmwang merged commit fcd8b89 into apache:main Apr 13, 2023
@alamb
Copy link
Contributor

alamb commented Apr 13, 2023

🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make RowAccumulator support Decimal
4 participants