Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix metrics regressions #1132

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,21 +1230,20 @@ impl PhysicalPlanner {
)),
))
} else {
// we insert a projection around the hash join in this case
let projection =
let swapped_hash_join =
swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?;
let swapped_hash_join = Arc::clone(projection.children()[0]);
let mut additional_native_plans = swapped_hash_join
.children()
.iter()
.map(|p| Arc::clone(p))
.collect::<Vec<_>>();
additional_native_plans.push(Arc::clone(&swapped_hash_join));

let mut additional_native_plans = vec![];
if swapped_hash_join.as_any().is::<ProjectionExec>() {
// a projection was added to the hash join
additional_native_plans.push(Arc::clone(swapped_hash_join.children()[0]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose there is no chance of having children empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is the case where we have wrapped the hash join in a projection

}

Ok((
scans,
Arc::new(SparkPlan::new_with_additional(
spark_plan.plan_id,
projection,
swapped_hash_join,
vec![join_params.left, join_params.right],
additional_native_plans,
)),
Expand Down Expand Up @@ -2550,12 +2549,7 @@ mod tests {

assert_eq!("FilterExec", filter_exec.native_plan.name());
assert_eq!(1, filter_exec.children.len());
assert_eq!(1, filter_exec.additional_native_plans.len());
assert_eq!("ScanExec", filter_exec.additional_native_plans[0].name());

let scan_exec = &filter_exec.children()[0];
assert_eq!("ScanExec", scan_exec.native_plan.name());
assert_eq!(0, scan_exec.additional_native_plans.len());
assert_eq!(0, filter_exec.additional_native_plans.len());
}

#[test]
Expand All @@ -2581,10 +2575,6 @@ mod tests {
assert_eq!(2, hash_join_exec.children.len());
assert_eq!("ScanExec", hash_join_exec.children[0].native_plan.name());
assert_eq!("ScanExec", hash_join_exec.children[1].native_plan.name());

assert_eq!(2, hash_join_exec.additional_native_plans.len());
assert_eq!("ScanExec", hash_join_exec.additional_native_plans[0].name());
assert_eq!("ScanExec", hash_join_exec.additional_native_plans[1].name());
}

fn create_bound_reference(index: i32) -> Expr {
Expand Down
8 changes: 2 additions & 6 deletions native/core/src/execution/datafusion/spark_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::operators::{CopyExec, ScanExec};
use crate::execution::operators::CopyExec;
use arrow_schema::SchemaRef;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
Expand All @@ -32,7 +32,7 @@ pub(crate) struct SparkPlan {
/// Child Spark plans
pub(crate) children: Vec<Arc<SparkPlan>>,
/// Additional native plans that were generated for this Spark plan that we need
/// to collect metrics for (such as CopyExec and ScanExec)
/// to collect metrics for
pub(crate) additional_native_plans: Vec<Arc<dyn ExecutionPlan>>,
}

Expand Down Expand Up @@ -94,9 +94,5 @@ fn collect_additional_plans(
) {
if child.as_any().is::<CopyExec>() {
additional_native_plans.push(Arc::clone(&child));
// CopyExec may be wrapping a ScanExec
collect_additional_plans(Arc::clone(child.children()[0]), additional_native_plans);
} else if child.as_any().is::<ScanExec>() {
additional_native_plans.push(Arc::clone(&child));
}
}
Loading