Skip to content

Commit

Permalink
missing GpuMetric wrapping/unwrapping
Browse files Browse the repository at this point in the history
  • Loading branch information
jihoonson committed Jan 7, 2025
1 parent c528662 commit 2d2c09c
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy, UnaryExecNode}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuWriteJobStatsTracker
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -106,11 +105,11 @@ case class GpuRapidsDeltaWriteExec(child: SparkPlan) extends V2CommandExec
with UnaryExecNode with GpuExec {
override def output: Seq[Attribute] = child.output

lazy val basicMetrics: Map[String, SQLMetric] = GpuWriteJobStatsTracker.basicMetrics
lazy val taskMetrics: Map[String, SQLMetric] = GpuWriteJobStatsTracker.taskMetrics
lazy val basicMetrics: Map[String, GpuMetric] = GpuWriteJobStatsTracker.basicMetrics
lazy val taskMetrics: Map[String, GpuMetric] = GpuWriteJobStatsTracker.taskMetrics

override lazy val allMetrics: Map[String, GpuMetric] =
GpuMetric.wrap(basicMetrics ++ taskMetrics)
basicMetrics ++ taskMetrics

override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
// This is just a stub node for planning purposes and does not actually perform
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -179,8 +179,8 @@ class GpuOptimisticTransaction
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -179,8 +179,8 @@ class GpuOptimisticTransaction
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -201,8 +201,8 @@ class GpuOptimisticTransaction
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -201,8 +201,8 @@ class GpuOptimisticTransaction
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -203,8 +203,8 @@ class GpuOptimisticTransaction
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -220,8 +220,8 @@ class GpuOptimisticTransaction(
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -217,8 +217,8 @@ class GpuOptimisticTransaction(
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -219,8 +219,8 @@ class GpuOptimisticTransaction(
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -219,8 +219,8 @@ class GpuOptimisticTransaction(
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val basicWriteJobStatsTracker = new BasicColumnarWriteJobStatsTracker(
serializableHadoopConf,
BasicWriteJobStatsTracker.metrics)
registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
GpuMetric.wrap(BasicWriteJobStatsTracker.metrics))
registerSQLMetrics(spark, GpuMetric.unwrap(basicWriteJobStatsTracker.driverSideMetrics))
statsTrackers.append(basicWriteJobStatsTracker)
gpuRapidsWrite.foreach { grw =>
val tracker = new GpuWriteJobStatsTracker(serializableHadoopConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,16 @@ trait GpuExec extends SparkPlan {
GpuMetric.create(level, name)

def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.createNanoTimingMetric(level, name)
GpuMetric.createNanoTiming(level, name)

def createSizeMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.createSizeMetric(level, name)
GpuMetric.createSize(level, name)

def createAverageMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.createAverageMetric(level, name)
GpuMetric.createAverage(level, name)

def createTimingMetric(level: MetricsLevel, name: String): GpuMetric =
GpuMetric.createTimingMetric(level, name)
GpuMetric.createTiming(level, name)

protected def createFileCacheMetrics(): Map[String, GpuMetric] = {
if (FileCacheConf.FILECACHE_ENABLED.get(conf)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,16 @@ object GpuMetric extends Logging with SQLConfHelper {
def create(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createMetric(sparkContext, name))

def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric =
def createNanoTiming(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createNanoTimingMetric(sparkContext, name))

def createSizeMetric(level: MetricsLevel, name: String): GpuMetric =
def createSize(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createSizeMetric(sparkContext, name))

def createAverageMetric(level: MetricsLevel, name: String): GpuMetric =
def createAverage(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createAverageMetric(sparkContext, name))

def createTimingMetric(level: MetricsLevel, name: String): GpuMetric =
def createTiming(level: MetricsLevel, name: String): GpuMetric =
createInternal(level, SQLMetrics.createTimingMetric(sparkContext, name))

def unwrap(input: GpuMetric): SQLMetric = input match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,15 @@ object BasicColumnarWriteJobStatsTracker {
Map(
NUM_FILES_KEY -> GpuMetric.create(GpuMetric.ESSENTIAL_LEVEL,
"number of written files"),
NUM_OUTPUT_BYTES_KEY -> GpuMetric.createSizeMetric(GpuMetric.ESSENTIAL_LEVEL,
NUM_OUTPUT_BYTES_KEY -> GpuMetric.createSize(GpuMetric.ESSENTIAL_LEVEL,
"written output"),
NUM_OUTPUT_ROWS_KEY -> GpuMetric.create(GpuMetric.ESSENTIAL_LEVEL,
"number of output rows"),
NUM_PARTS_KEY -> GpuMetric.create(GpuMetric.ESSENTIAL_LEVEL,
"number of dynamic part"),
TASK_COMMIT_TIME -> GpuMetric.createTimingMetric(GpuMetric.ESSENTIAL_LEVEL,
TASK_COMMIT_TIME -> GpuMetric.createTiming(GpuMetric.ESSENTIAL_LEVEL,
"task commit time"),
JOB_COMMIT_TIME -> GpuMetric.createTimingMetric(GpuMetric.ESSENTIAL_LEVEL,
JOB_COMMIT_TIME -> GpuMetric.createTiming(GpuMetric.ESSENTIAL_LEVEL,
"job commit time")
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ object GpuWriteJobStatsTracker {

def taskMetrics: Map[String, GpuMetric] = {
Map(
GPU_TIME_KEY -> GpuMetric.createNanoTimingMetric(GpuMetric.ESSENTIAL_LEVEL, "GPU time"),
WRITE_TIME_KEY -> GpuMetric.createNanoTimingMetric(GpuMetric.ESSENTIAL_LEVEL,
GPU_TIME_KEY -> GpuMetric.createNanoTiming(GpuMetric.ESSENTIAL_LEVEL, "GPU time"),
WRITE_TIME_KEY -> GpuMetric.createNanoTiming(GpuMetric.ESSENTIAL_LEVEL,
"write time"),
TASK_COMMIT_TIME -> basicMetrics(TASK_COMMIT_TIME),
ASYNC_WRITE_TOTAL_THROTTLE_TIME_KEY -> GpuMetric.createNanoTimingMetric(
ASYNC_WRITE_TOTAL_THROTTLE_TIME_KEY -> GpuMetric.createNanoTiming(
GpuMetric.DEBUG_LEVEL, "total throttle time"),
ASYNC_WRITE_AVG_THROTTLE_TIME_KEY -> GpuMetric.createNanoTimingMetric(
ASYNC_WRITE_AVG_THROTTLE_TIME_KEY -> GpuMetric.createNanoTiming(
GpuMetric.DEBUG_LEVEL, "avg throttle time per async write"),
ASYNC_WRITE_MIN_THROTTLE_TIME_KEY -> GpuMetric.createNanoTimingMetric(
ASYNC_WRITE_MIN_THROTTLE_TIME_KEY -> GpuMetric.createNanoTiming(
GpuMetric.DEBUG_LEVEL, "min throttle time per async write"),
ASYNC_WRITE_MAX_THROTTLE_TIME_KEY -> GpuMetric.createNanoTimingMetric(
ASYNC_WRITE_MAX_THROTTLE_TIME_KEY -> GpuMetric.createNanoTiming(
GpuMetric.DEBUG_LEVEL, "max throttle time per async write")
)
}
Expand Down

0 comments on commit 2d2c09c

Please sign in to comment.