-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944
[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944
Conversation
…-liu/spark into skipSnapshotAtBatch
Is there necessity to add an end-to-end test for the options? If so, I can create another PR. The way to construct it is probably by sleeping for a sufficiently long time for maintenance task to run. @anishshri-db @HeartSaVioR |
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
throw QueryExecutionErrors.failedToReadSnapshotFileNotExistsError( | ||
snapshotFile(startVersion), toString(), null) | ||
} | ||
synchronized { putStateIntoStateCacheMap(startVersion, startVersionMap.get) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to refactor this with existing loadMap fcn? or add helper function for shared logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For HDFS, it is hard because the common part is really small. But for RocksDB, there is room for refactoring. For example, this is PR is to test whether we can extract a common part of both load
function. #46927
* @param endVersion checkpoint version to end with | ||
*/ | ||
def getStore(startVersion: Long, endVersion: Long): StateStore = | ||
throw new SparkUnsupportedOperationException("getStore with startVersion and endVersion " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just put nothing here? like
def getStore(version: Long): StateStore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we cannot, because to make this method optional, it has to have a default implementation, otherwise a build error will be thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - what error do you see here ? can you paste it please ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Building on the assumption that when users create custom state store provider and they do not implement this method because it is optional. They will see errors like
Missing implementation for member of trait StateStoreProvider
import org.apache.spark.io.CompressionCodec | ||
import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row} | ||
import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow} | ||
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning | ||
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil | ||
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog} | ||
import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, RocksDBStateStoreProvider, StateStore} | ||
import org.apache.spark.sql.execution.streaming.state._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this because these three are everything in that pkg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The reason is I use three new classes in this pkg. I think it would be too long to include them all. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this should be good
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Outdated
Show resolved
Hide resolved
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Show resolved
Hide resolved
...est/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
Outdated
Show resolved
Hide resolved
@WweiL |
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
|
||
case Some(snapshotStartBatchId) => | ||
if (!provider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) { | ||
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we've been used to throw the exception here explicitly rather than the method to throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error will be thrown in two places (another is in JoinStateManager), so I create a function for it. This way the error also gets its own error class.
* @param endVersion checkpoint version to end with | ||
* @return [[HDFSBackedStateStore]] | ||
*/ | ||
override def replayStateFromSnapshot(startVersion: Long, endVersion: Long): StateStore = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please apply the same: for all methods in this PR, if the meaning of startVersion
is actually the snapshot version to begin with, let's use snapshotVersion
which is more clearer about the intention.
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
if (startVersion < 1) { | ||
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion) | ||
} | ||
if (endVersion < startVersion || endVersion < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we'd like to give a different error for invalid value (negative) vs criteria (endVersion has to be equal or later than startVersion). The error message wouldn't give the context on why it failed. Users could check the option value by themselves but ideally better to kindly tell them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the former check already covers the latter one; startVersion has to be equal or higher than 1, so endVersion also has to be equal or higher than 1. The latter is only needed when we want to produce different error on different pattern of invalid value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we'd like to give a different error for invalid value (negative) vs criteria (endVersion has to be equal or later than startVersion). The error message wouldn't give the context on why it failed. Users could check the option value by themselves but ideally better to kindly tell them.
There is a better error message in StateDataSource where the users' input is verified and it is the only usage of this function. I think the error message here will not matter too much since I would not expect users to call this method directly.
if (endVersion < startVersion) { | ||
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion) | ||
} | ||
rocksDB.loadFromSnapshot(startVersion, endVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to double check, readOnly flag is not needed unlike the path of load(), do I understand correctly? If then shall we just implement one of two and call other to reduce redundant code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Good catch.
*/ | ||
trait SupportsFineGrainedReplayFromSnapshot { | ||
/** | ||
* Used by snapshotStartBatchId option when reading state generated by join operation as data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not couple too much with implementation details, especially the current implementation of Spark codebase. 3rd party state store provider does not need to know about this. If we think this is needed to please them to implement the two different methods, let's just leave this method for the interface level and wrap the state store with read-only in caller side.
* @param snapshotVersion checkpoint version of the snapshot to start with | ||
* @param endVersion checkpoint version to end with | ||
*/ | ||
def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we just provide the default implementation to wrap the read-write state store to read-only? We wouldn't need to let state store provider to implement this except the case they can optimize specifically for read-only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can update the comment like the way why they may want to implement this or they would just leave it as default, instead of describing where will call this method.
@@ -264,3 +296,8 @@ class StateStoreValueRowFormatValidationFailure(errorMsg: String) | |||
extends SparkRuntimeException( | |||
errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE", | |||
messageParameters = Map("errorMsg" -> errorMsg)) | |||
|
|||
class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String) | |||
extends SparkUnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one more space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where to insert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before e
, it's only one space.
stateStoreProvider.getStore(stateInfo.get.storeVersion) | ||
if (snapshotStartVersion.isDefined) { | ||
if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) { | ||
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -796,4 +973,141 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass | |||
testForSide("right") | |||
} | |||
} | |||
|
|||
protected def testSnapshotNotFound(): Unit = { | |||
withTempDir(tempDir => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: according to Databricks scala style, this should be withTempDir { tempDir =>
, could save one indentation (curly brace)
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot] | ||
.replayReadStateFromSnapshot(1, 2) | ||
} | ||
checkError(exc, "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we can provide users the better error message e.g. snapshot file does not exist, but I'm OK with addressing this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put it later along with the changelog file not found exception.
} | ||
|
||
protected def testGetReadStoreWithStartVersion(): Unit = { | ||
withTempDir(tempDir => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
|
||
protected def testSnapshotPartitionId(): Unit = { | ||
withTempDir(tempDir => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
.option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0) | ||
.option( | ||
StateSourceOptions.SNAPSHOT_PARTITION_ID, | ||
spark.sessionState.conf.numShufflePartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just need to be > 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it is because of limit operator.
}) | ||
} | ||
|
||
// Todo: Should also test against state generated by 3.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it remaining TODO, or does not need to be done at all? If we don't need to, let's remove the golden files for 3.5. I guess it's not intentional to test cross version compatibility.
checkAnswer(stateSnapshotDf, stateDf) | ||
} | ||
|
||
protected def testSnapshotOnLimitState(providerName: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General comment for tests using golden file: please leave the code as comment or so how you build the golden file (the query you used), to let other be able to re-build the golden file if needed.
} | ||
|
||
/** | ||
* Consturct the state at endVersion from snapshot from snapshotVersion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Construct the state at
@@ -367,6 +368,22 @@ private[sql] class RocksDBStateStoreProvider | |||
private def verify(condition: => Boolean, msg: String): Unit = { | |||
if (!condition) { throw new IllegalStateException(msg) } | |||
} | |||
|
|||
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a small function comment here ?
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE", | ||
messageParameters = Map( | ||
"fileToRead" -> fileToRead, | ||
"clazz" -> clazz)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a common convention for the parameter naming ? this will be visible in the error message that is thrown, correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems so. the parameter names will not appear. I learned from here: https://github.com/apache/spark/blob/6bfeb094248269920df8b107c86f0982404935cd/common/utils/src/main/resources/error/error-conditions.json#L236C54-L236C59
|
||
protected def testSnapshotOnDeduplicateState(providerName: String): Unit = { | ||
/** The golden files are generated by: | ||
withSQLConf({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent seems odd in these places, but maybe not a big deal for such comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will move one tab right.
} | ||
*/ | ||
val resourceUri = this.getClass.getResource( | ||
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were going to run against 3.5.1 and then run the query once to generate the operator metadata. Did we decide against that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly saying, the test about checkpoint with no operator metadata to create operator metadata should have been done in state metadata testing. If we don't have one, we'd better to have one, but no need to couple with this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm - pending some minor comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only nits and minors. Thanks for the patience!
@@ -900,7 +900,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |||
*/ | |||
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { | |||
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion) | |||
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " + | |||
logInfo(log"Retrieved snapshot at version " + | |||
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after version
, as the next string does not start with space.
@@ -917,9 +918,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |||
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): | |||
ReadStateStore = { | |||
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion) | |||
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " + | |||
logInfo(log"Retrieved snapshot at version " + | |||
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here
} | ||
*/ | ||
val resourceUri = this.getClass.getResource( | ||
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly saying, the test about checkpoint with no operator metadata to create operator metadata should have been done in state metadata testing. If we don't have one, we'd better to have one, but no need to couple with this PR.
@@ -264,3 +296,8 @@ class StateStoreValueRowFormatValidationFailure(errorMsg: String) | |||
extends SparkRuntimeException( | |||
errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE", | |||
messageParameters = Map("errorMsg" -> errorMsg)) | |||
|
|||
class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String) | |||
extends SparkUnsupportedOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before e
, it's only one space.
Thanks for all the careful checks by @HeartSaVioR @anishshri-db @WweiL. This PR is ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks! Merging to master. |
…to State Data Source ### What changes were proposed in this pull request? In #46944 and #47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…rtitionId to state data source ### What changes were proposed in this pull request? This PR defines two new options, snapshotStartBatchId and snapshotPartitionId, for the existing state reader. Both of them should be provided at the same time. 1. When there is no snapshot file at `snapshotStartBatch` (note there is an off-by-one issue between version and batch Id), throw an exception. 2. Otherwise, the reader should continue to rebuild the state by reading delta files only, and ignore all snapshot files afterwards. 3. Note that if a `batchId` option is already specified. That batchId is the ending batchId, we should then end at that batchId. 4. This feature supports state generated by HDFS state store provider and RocksDB state store provider with changelog checkpointing enabled. **It does not support RocksDB with changelog disabled which is the default for RocksDB.** ### Why are the changes needed? Sometimes when a snapshot is corrupted, users want to bypass it when reading a later state. This PR gives user ability to specify the starting snapshot version and partition. This feature can be useful for debugging purpose. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Created test cases for testing edge cases for the input of new options. Created test for the new public function `replayReadStateFromSnapshot`. Created integration test for the new options against four stateful operators: limit, aggregation, deduplication, stream-stream join. Instead of generating states within the tests which is unstable, I prepare golden files for the integration test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#46944 from eason-yuchen-liu/skipSnapshotAtBatch. Lead-authored-by: Yuchen Liu <[email protected]> Co-authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
What changes were proposed in this pull request?
This PR defines two new options, snapshotStartBatchId and snapshotPartitionId, for the existing state reader. Both of them should be provided at the same time.
snapshotStartBatch
(note there is an off-by-one issue between version and batch Id), throw an exception.batchId
option is already specified. That batchId is the ending batchId, we should then end at that batchId.Why are the changes needed?
Sometimes when a snapshot is corrupted, users want to bypass it when reading a later state. This PR gives user ability to specify the starting snapshot version and partition. This feature can be useful for debugging purpose.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Created test cases for testing edge cases for the input of new options. Created test for the new public function
replayReadStateFromSnapshot
. Created integration test for the new options against four stateful operators: limit, aggregation, deduplication, stream-stream join. Instead of generating states within the tests which is unstable, I prepare golden files for the integration test.Was this patch authored or co-authored using generative AI tooling?
No.