Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Apr 12, 2024
1 parent 624e6a5 commit aa7388d
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import org.apache.spark.sql.streaming.{ListState, TTLConfig}
import org.apache.spark.util.NextIterator

/**
* Provides concrete implementation for list of values associated with a state variable
* used in the streaming transformWithState operator.
* Class that provides a concrete implementation for a list state state associated with state
* variables (with ttl expiration support) used in the streaming transformWithState operator.
*
* @param store - reference to the StateStore instance to be used for storing state
* @param stateName - name of logical state partition
* @param keyEnc - Spark SQL encoder for key
* @param keyExprEnc - Spark SQL encoder for key
* @param valEncoder - Spark SQL encoder for value
* @tparam S - data type of object that will be stored in the list
* @param ttlConfig - TTL configuration for values stored in this state
* @param batchTimestampMs - current batch processing timestamp.
* @tparam S - data type of object that will be stored
*/
class ListStateImplWithTTL[S](
store: StateStore,
Expand Down Expand Up @@ -147,7 +149,6 @@ class ListStateImplWithTTL[S](
}

/**
*
* Loops through all the values associated with the grouping key, and removes
* the expired elements from the list.
* @param groupingKey grouping key for which cleanup should be performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ trait TTLState {
*
* @param groupingKey grouping key for which cleanup should be performed.
*
* @return true if the state was cleared, false otherwise.
* @return how many state objects were cleaned up.
*/
def clearIfExpired(groupingKey: Array[Byte]): Long
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,14 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase {
}
}


test(s"ttl States are not populated for timeMode=None") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
UUID.randomUUID(), keyExprEncoder, TimeMode.None())

handle.getValueState("testState", Encoders.STRING)
handle.getListState("testState", Encoders.STRING)
handle.getValueState("testValueState", Encoders.STRING)
handle.getListState("testListState", Encoders.STRING)

assert(handle.ttlStates.isEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ class ListStateTTLProcessor(ttlConfig: TTLConfig)
class TransformWithListStateTTLSuite extends TransformWithStateTTLTest {

import testImplicits._

override def getProcessor(ttlConfig: TTLConfig):
StatefulProcessor[String, InputEvent, OutputEvent] = {
new ListStateTTLProcessor(ttlConfig)
StatefulProcessor[String, InputEvent, OutputEvent] = {
new ListStateTTLProcessor(ttlConfig)
}

override def getStateTTLMetricName: String = "numListStateWithTTLVars"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,10 @@ case class MultipleValueStatesTTLProcessor(
class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest {

import testImplicits._

override def getProcessor(ttlConfig: TTLConfig):
StatefulProcessor[String, InputEvent, OutputEvent] = {
new ValueStateTTLProcessor(ttlConfig)
new ValueStateTTLProcessor(ttlConfig)
}

override def getStateTTLMetricName: String = "numValueStateWithTTLVars"
Expand Down

0 comments on commit aa7388d

Please sign in to comment.