Skip to content

Commit

Permalink
fix: Various metrics bug fixes and improvements (#1111)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Dec 2, 2024
1 parent 5400fd7 commit ebdde77
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 79 deletions.
1 change: 1 addition & 0 deletions native/core/src/execution/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ pub mod expressions;
mod operators;
pub mod planner;
pub mod shuffle_writer;
pub(crate) mod spark_plan;
mod util;
295 changes: 230 additions & 65 deletions native/core/src/execution/datafusion/planner.rs

Large diffs are not rendered by default.

102 changes: 102 additions & 0 deletions native/core/src/execution/datafusion/spark_plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::execution::operators::{CopyExec, ScanExec};
use arrow_schema::SchemaRef;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

/// Wrapper around a native plan that maps to a Spark plan and can optionally contain
/// references to other native plans that should contribute to the Spark SQL metrics
/// for the root plan (such as CopyExec and ScanExec nodes)
#[derive(Debug, Clone)]
pub(crate) struct SparkPlan {
/// Spark plan ID (used for informational purposes only)
pub(crate) plan_id: u32,
/// The root of the native plan that was generated for this Spark plan
pub(crate) native_plan: Arc<dyn ExecutionPlan>,
/// Child Spark plans
pub(crate) children: Vec<Arc<SparkPlan>>,
/// Additional native plans that were generated for this Spark plan that we need
/// to collect metrics for (such as CopyExec and ScanExec)
pub(crate) additional_native_plans: Vec<Arc<dyn ExecutionPlan>>,
}

impl SparkPlan {
/// Create a SparkPlan that consists of a single native plan
pub(crate) fn new(
plan_id: u32,
native_plan: Arc<dyn ExecutionPlan>,
children: Vec<Arc<SparkPlan>>,
) -> Self {
let mut additional_native_plans: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in &children {
collect_additional_plans(Arc::clone(&child.native_plan), &mut additional_native_plans);
}
Self {
plan_id,
native_plan,
children,
additional_native_plans,
}
}

/// Create a SparkPlan that consists of more than one native plan
pub(crate) fn new_with_additional(
plan_id: u32,
native_plan: Arc<dyn ExecutionPlan>,
children: Vec<Arc<SparkPlan>>,
additional_native_plans: Vec<Arc<dyn ExecutionPlan>>,
) -> Self {
let mut accum: Vec<Arc<dyn ExecutionPlan>> = vec![];
for plan in &additional_native_plans {
accum.push(Arc::clone(plan));
}
for child in &children {
collect_additional_plans(Arc::clone(&child.native_plan), &mut accum);
}
Self {
plan_id,
native_plan,
children,
additional_native_plans: accum,
}
}

/// Get the schema of the native plan
pub(crate) fn schema(&self) -> SchemaRef {
self.native_plan.schema()
}

/// Get the child SparkPlan instances
pub(crate) fn children(&self) -> &Vec<Arc<SparkPlan>> {
&self.children
}
}

fn collect_additional_plans(
child: Arc<dyn ExecutionPlan>,
additional_native_plans: &mut Vec<Arc<dyn ExecutionPlan>>,
) {
if child.as_any().is::<CopyExec>() {
additional_native_plans.push(Arc::clone(&child));
// CopyExec may be wrapping a ScanExec
collect_additional_plans(Arc::clone(child.children()[0]), additional_native_plans);
} else if child.as_any().is::<ScanExec>() {
additional_native_plans.push(Arc::clone(&child));
}
}
17 changes: 10 additions & 7 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::{
disk_manager::DiskManagerConfig,
runtime_env::{RuntimeConfig, RuntimeEnv},
},
physical_plan::{display::DisplayableExecutionPlan, ExecutionPlan, SendableRecordBatchStream},
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
prelude::{SessionConfig, SessionContext},
};
use futures::poll;
Expand Down Expand Up @@ -59,6 +59,7 @@ use jni::{
};
use tokio::runtime::Runtime;

use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::execution::operators::ScanExec;
use log::info;

Expand All @@ -69,7 +70,7 @@ struct ExecutionContext {
/// The deserialized Spark plan
pub spark_plan: Operator,
/// The DataFusion root operator converted from the `spark_plan`
pub root_op: Option<Arc<dyn ExecutionPlan>>,
pub root_op: Option<Arc<SparkPlan>>,
/// The input sources for the DataFusion plan
pub scans: Vec<ScanExec>,
/// The global reference of input sources for the DataFusion plan
Expand Down Expand Up @@ -360,7 +361,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(

if exec_context.explain_native {
let formatted_plan_str =
DisplayableExecutionPlan::new(root_op.as_ref()).indent(true);
DisplayableExecutionPlan::new(root_op.native_plan.as_ref()).indent(true);
info!("Comet native query plan:\n{formatted_plan_str:}");
}

Expand All @@ -369,6 +370,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
.root_op
.as_ref()
.unwrap()
.native_plan
.execute(0, task_ctx)?;
exec_context.stream = Some(stream);
} else {
Expand Down Expand Up @@ -400,12 +402,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
if exec_context.explain_native {
if let Some(plan) = &exec_context.root_op {
let formatted_plan_str =
DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent(true);
DisplayableExecutionPlan::with_metrics(plan.native_plan.as_ref())
.indent(true);
info!(
"Comet native query plan with metrics:\
\n[Stage {} Partition {}] plan creation (including CometScans fetching first batches) took {:?}:\
"Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\
\n plan creation (including CometScans fetching first batches) took {:?}:\
\n{formatted_plan_str:}",
stage_id, partition, exec_context.plan_creation_time
plan.plan_id, stage_id, partition, exec_context.plan_creation_time
);
}
}
Expand Down
31 changes: 25 additions & 6 deletions native/core/src/execution/metrics/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,50 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::jvm_bridge::jni_new_global_ref;
use crate::{
errors::CometError,
jvm_bridge::{jni_call, jni_new_string},
};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricValue;
use jni::objects::{GlobalRef, JString};
use jni::{objects::JObject, JNIEnv};
use std::collections::HashMap;
use std::sync::Arc;

/// Updates the metrics of a CometMetricNode. This function is called recursively to
/// update the metrics of all the children nodes. The metrics are pulled from the
/// DataFusion execution plan and pushed to the Java side through JNI.
/// native execution plan and pushed to the Java side through JNI.
pub fn update_comet_metric(
env: &mut JNIEnv,
metric_node: &JObject,
execution_plan: &Arc<dyn ExecutionPlan>,
spark_plan: &Arc<SparkPlan>,
metrics_jstrings: &mut HashMap<String, Arc<GlobalRef>>,
) -> Result<(), CometError> {
// combine all metrics from all native plans for this SparkPlan
let metrics = if spark_plan.additional_native_plans.is_empty() {
spark_plan.native_plan.metrics()
} else {
let mut metrics = spark_plan.native_plan.metrics().unwrap_or_default();
for plan in &spark_plan.additional_native_plans {
let additional_metrics = plan.metrics().unwrap_or_default();
for c in additional_metrics.iter() {
match c.value() {
MetricValue::OutputRows(_) => {
// we do not want to double count output rows
}
_ => metrics.push(c.to_owned()),
}
}
}
Some(metrics.aggregate_by_name())
};

update_metrics(
env,
metric_node,
&execution_plan
.metrics()
&metrics
.unwrap_or_default()
.iter()
.map(|m| m.value())
Expand All @@ -49,7 +68,7 @@ pub fn update_comet_metric(
)?;

unsafe {
for (i, child_plan) in execution_plan.children().iter().enumerate() {
for (i, child_plan) in spark_plan.children().iter().enumerate() {
let child_metric_node: JObject = jni_call!(env,
comet_metric_node(metric_node).get_child_node(i as i32) -> JObject
)?;
Expand Down
3 changes: 3 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ message Operator {
// The child operators of this
repeated Operator children = 1;

// Spark plan ID
uint32 plan_id = 2;

oneof op_struct {
Scan scan = 100;
Projection projection = 101;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2508,7 +2508,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
*/
def operator2Proto(op: SparkPlan, childOp: Operator*): Option[Operator] = {
val conf = op.conf
val result = OperatorOuterClass.Operator.newBuilder()
val result = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
childOp.foreach(result.addChildren)

op match {
Expand Down

0 comments on commit ebdde77

Please sign in to comment.