From 624e6a5af0eab5b281c20e617589779dbdb10eb6 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Fri, 12 Apr 2024 11:40:44 -0700 Subject: [PATCH] metric changes --- .../sql/execution/streaming/TransformWithStateExec.scala | 2 ++ .../spark/sql/streaming/TransformWithListStateTTLSuite.scala | 2 ++ .../spark/sql/streaming/TransformWithStateTTLTest.scala | 4 +++- .../spark/sql/streaming/TransformWithValueStateTTLSuite.scala | 1 + 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index be01d55b3f4d3..f5d2610d78d9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -311,6 +311,8 @@ case class TransformWithStateExec( // metrics around TTL StatefulOperatorCustomSumMetric("numValueStateWithTTLVars", "Number of value state variables with TTL"), + StatefulOperatorCustomSumMetric("numListStateWithTTLVars", + "Number of list state variables with TTL"), StatefulOperatorCustomSumMetric("numValuesRemovedDueToTTLExpiry", "Number of values removed due to TTL expiry") ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala index 9ecd50d8e3e44..f868396f23f04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala @@ -103,6 +103,8 @@ class TransformWithListStateTTLSuite extends TransformWithStateTTLTest { new ListStateTTLProcessor(ttlConfig) } + override def getStateTTLMetricName: String = "numListStateWithTTLVars" + test("verify iterator works with expired values in middle of list") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateTTLTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateTTLTest.scala index 40696e8f719c4..9e471d3efc8c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateTTLTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateTTLTest.scala @@ -46,6 +46,8 @@ abstract class TransformWithStateTTLTest def getProcessor(ttlConfig: TTLConfig): StatefulProcessor[String, InputEvent, OutputEvent] + def getStateTTLMetricName: String + test("validate state is evicted at ttl expiry") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { @@ -113,7 +115,7 @@ abstract class TransformWithStateTTLTest // for stateful operator val progData = q.recentProgress.filter(prog => prog.stateOperators.size > 0) assert(progData.filter(prog => - prog.stateOperators(0).customMetrics.get("numValueStateWithTTLVars") > 0).size > 0) + prog.stateOperators(0).customMetrics.get(getStateTTLMetricName) > 0).size > 0) assert(progData.filter(prog => prog.stateOperators(0).customMetrics .get("numValuesRemovedDueToTTLExpiry") > 0).size > 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala index 4a6535c8100f0..7b9620a362f61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala @@ -168,6 +168,7 @@ class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { new ValueStateTTLProcessor(ttlConfig) } + override def getStateTTLMetricName: String = "numValueStateWithTTLVars" test("validate multiple value states") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->