From d3119fac0a09a2c6290762c9ba573378ccf30dfc Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Fri, 22 Nov 2024 12:05:22 +0900 Subject: [PATCH] [SPARK-50378][SS] Add custom metric for tracking spent for proc initial state in transformWithState ### What changes were proposed in this pull request? Add custom metric for tracking spent for proc initial state in transformWithState ### Why are the changes needed? Adds tracking for time spent in populating initial state ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests ``` [info] Run completed in 2 minutes, 38 seconds. [info] Total number of tests run: 22 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 22, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #48913 from anishshri-db/task/SPARK-50378. Authored-by: Anish Shrigondekar Signed-off-by: Jungtaek Lim --- .../sql/execution/streaming/TransformWithStateExec.scala | 6 ++++++ .../streaming/TransformWithStateInitialStateSuite.scala | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 2b26d18019d12..107f98b09f858 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -410,6 +410,9 @@ case class TransformWithStateExec( // operator specific metrics override def customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric] = { Seq( + // metrics around initial state + StatefulOperatorCustomSumMetric("initialStateProcessingTimeMs", + "Number of milliseconds taken to process all initial state"), // metrics around state variables StatefulOperatorCustomSumMetric("numValueStateVars", "Number of value state variables"), StatefulOperatorCustomSumMetric("numListStateVars", "Number of list state variables"), @@ -655,6 +658,8 @@ case class TransformWithStateExec( statefulProcessor.init(outputMode, timeMode) processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) + val initialStateProcTimeMs = longMetric("initialStateProcessingTimeMs") + val initialStateStartTimeNs = System.nanoTime // Check if is first batch // Only process initial states for first batch if (processorHandle.getQueryInfo().getBatchId == 0) { @@ -667,6 +672,7 @@ case class TransformWithStateExec( processInitialStateRows(keyRow.asInstanceOf[UnsafeRow], valueRowIter) } } + initialStateProcTimeMs += NANOSECONDS.toMillis(System.nanoTime - initialStateStartTimeNs) processDataWithPartition(childDataIterator, store, processorHandle) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala index 360656a76f350..806d2f19f6f5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala @@ -395,6 +395,10 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest AddData(inputData, InitInputRow("k2", "update", 40.0)), AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)), CheckNewAnswer(("non-exist", "getOption", -1.0)), + Execute { q => + assert(q.lastProgress + .stateOperators(0).customMetrics.get("initialStateProcessingTimeMs") > 0) + }, AddData(inputData, InitInputRow("k1", "appendList", 37.0)), AddData(inputData, InitInputRow("k2", "appendList", 40.0)), AddData(inputData, InitInputRow("non-exist", "getList", -1.0)), @@ -514,6 +518,10 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest AdvanceManualClock(1 * 1000), // registered timer for "a" and "b" is 6000, first batch is processed at ts = 1000 CheckNewAnswer(("c", "1")), + Execute { q => + assert(q.lastProgress + .stateOperators(0).customMetrics.get("initialStateProcessingTimeMs") > 0) + }, AddData(inputData, "c"), AdvanceManualClock(6 * 1000), // ts = 7000, "a" expires