-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48772][SS][SQL] State Data Source Change Feed Reader Mode #47188
Conversation
commit 261c671 Author: Yuchen Liu <[email protected]> Date: Tue Jul 2 13:57:57 2024 -0700 solve conflict commit 39d0b17 Merge: 9af25f1 c2d59b0 Author: Yuchen Liu <[email protected]> Date: Tue Jul 2 13:45:12 2024 -0700 rebase to master commit c2d59b0 Merge: 9cf8b25 9af25f1 Author: Yuchen Liu <[email protected]> Date: Tue Jul 2 13:44:50 2024 -0700 Merge branch 'skipSnapshotAtBatch' into state-cdc commit 9af25f1 Merge: 8fa9ef5 fea930a Author: Yuchen Liu <[email protected]> Date: Tue Jul 2 13:23:25 2024 -0700 Merge branch 'apache:master' into skipSnapshotAtBatch commit fea930a Author: Anish Shrigondekar <[email protected]> Date: Wed Jul 3 05:21:50 2024 +0900 [SPARK-48770][SS] Change to read operator metadata once on driver to check if we can find info for numColsPrefixKey used for session window agg queries ### What changes were proposed in this pull request? Change to read operator metadata once on driver to check if we can find info for numColsPrefixKey used for session window agg queries ### Why are the changes needed? Avoid reading the operator metadata file multiple times on the executors ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.datasources.v2.state.RocksDBStateDataSourceReadSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), Idle Worker Monitor for python3 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=tru... [info] Run completed in 1 minute, 39 seconds. [info] Total number of tests run: 14 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47167 from anishshri-db/task/SPARK-48770. Authored-by: Anish Shrigondekar <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]> commit 8fa9ef5 Merge: 9dbe295 ee0d306 Author: Yuchen Liu <[email protected]> Date: Tue Jul 2 13:21:01 2024 -0700 Merge branch 'apache:master' into skipSnapshotAtBatch commit 9cf8b25 Author: Yuchen Liu <[email protected]> Date: Tue Jul 2 10:53:53 2024 -0700 add input error tests commit 7354408 Merge: 6d6d511 9dbe295 Author: Yuchen Liu <[email protected]> Date: Tue Jul 2 10:17:34 2024 -0700 Merge branch 'skipSnapshotAtBatch' into state-cdc commit 9dbe295 Author: Yuchen Liu <[email protected]> Date: Mon Jul 1 21:54:33 2024 -0700 minor commit 6d6d511 Author: Yuchen Liu <[email protected]> Date: Mon Jul 1 15:53:04 2024 -0700 move StateStoreChangeDataReader to other files and delete it commit 104ba9c Author: Yuchen Liu <[email protected]> Date: Mon Jul 1 15:36:08 2024 -0700 rename PUT to update commit 12298b2 Author: Yuchen Liu <[email protected]> Date: Mon Jul 1 13:09:02 2024 -0700 minor commit 75839ac Author: Yuchen Liu <[email protected]> Date: Mon Jul 1 13:03:59 2024 -0700 name all cdc to changeData commit ace711c Author: Yuchen Liu <[email protected]> Date: Mon Jul 1 12:49:07 2024 -0700 check validity of input to options commit 3834cc9 Author: Yuchen Liu <[email protected]> Date: Fri Jun 28 17:51:16 2024 -0700 solve format issue commit 337785d Author: Yuchen Liu <[email protected]> Date: Fri Jun 28 17:07:18 2024 -0700 address comments from Anish commit 15a8316 Author: Yuchen Liu <[email protected]> Date: Fri Jun 28 16:46:57 2024 -0700 refactor StateStoreChangeDataReader commit b1eb8c4 Author: Yuchen Liu <[email protected]> Date: Fri Jun 28 15:03:09 2024 -0700 add integration tests to the new features commit 7c6cdad Author: Yuchen Liu <[email protected]> Date: Thu Jun 27 16:35:46 2024 -0700 unify the two traits commit cd6a39b Merge: 271b98e d140708 Author: Yuchen Liu <[email protected]> Date: Thu Jun 27 16:22:45 2024 -0700 Merge branch 'skipSnapshotAtBatch' into state-cdc commit d140708 Author: Yuchen Liu <[email protected]> Date: Thu Jun 27 15:17:06 2024 -0700 provide the script to regenerate golden files commit 4deb63e Author: Yuchen Liu <[email protected]> Date: Thu Jun 27 14:22:00 2024 -0700 throw the exception commit 6f1425d Author: Yuchen Liu <[email protected]> Date: Thu Jun 27 12:09:54 2024 -0700 reflect more comments from Jungtaek commit 42d952f Author: Yuchen Liu <[email protected]> Date: Thu Jun 27 11:11:33 2024 -0700 rename SupportsFineGrainedReplayFromSnapshot to SupportsFineGrainedReplay commit e15213e Author: Yuchen Liu <[email protected]> Date: Thu Jun 27 11:05:50 2024 -0700 rename to startVersion to snapshotVersion to make its function clear commit 271b98e Author: Yuchen Liu <[email protected]> Date: Wed Jun 26 15:46:33 2024 -0700 make sure StateStoreChangeData is used everywhere commit ff5bff2 Merge: 6922595 40b6dc6 Author: Yuchen Liu <[email protected]> Date: Wed Jun 26 15:22:19 2024 -0700 Merge branch 'skipSnapshotAtBatch' into state-cdc commit 40b6dc6 Author: Yuchen Liu <[email protected]> Date: Wed Jun 26 10:59:17 2024 -0700 move error to StateStoreErrors commit 23639f4 Author: Yuchen Liu <[email protected]> Date: Wed Jun 26 10:44:22 2024 -0700 create new error for SupportsFineGrainedReplayFromSnapshot commit 97ee3ef Author: Yuchen Liu <[email protected]> Date: Wed Jun 26 10:25:57 2024 -0700 some naming and formatting comments from Anish and Jungtaek commit 1a23abb Author: Yuchen Liu <[email protected]> Date: Tue Jun 25 14:56:07 2024 -0700 refactor the code to isolate from current state stores used by streaming queries commit 876256e Author: Yuchen Liu <[email protected]> Date: Tue Jun 25 12:29:40 2024 -0700 reflect comments from Jungtaek commit ef9b095 Author: Yuchen Liu <[email protected]> Date: Tue Jun 25 12:08:34 2024 -0700 create integration test against golden files commit 6922595 Author: Yuchen Liu <[email protected]> Date: Mon Jun 24 13:44:19 2024 -0700 stage commit 3ece6f2 Author: Yuchen Liu <[email protected]> Date: Fri Jun 21 21:22:50 2024 -0700 resort error-conditions commit be30817 Author: Yuchen Liu <[email protected]> Date: Fri Jun 21 17:30:12 2024 -0700 Reflect more comments from Anish commit cf84d50 Author: Yuchen Liu <[email protected]> Date: Fri Jun 21 14:02:58 2024 -0700 support hdfs state store provider commit 752cdc7 Author: Yuchen Liu <[email protected]> Date: Thu Jun 20 17:51:33 2024 -0700 separate CDCPartitionReader from StatePartitionReader commit bd87055 Merge: 2184396 2eb6646 Author: Yuchen Liu <[email protected]> Date: Thu Jun 20 17:29:31 2024 -0700 Merge branch 'skipSnapshotAtBatch' into state-cdc commit 2eb6646 Author: Yuchen Liu <[email protected]> Date: Thu Jun 20 17:10:45 2024 -0700 also update the name of StateTable commit 2184396 Author: Yuchen Liu <[email protected]> Date: Thu Jun 20 17:03:18 2024 -0700 hdfs initial implementation commit 3f266c1 Author: Yuchen Liu <[email protected]> Date: Mon Jun 17 09:46:07 2024 -0700 style commit fe9cea1 Author: Yuchen Liu <[email protected]> Date: Fri Jun 14 12:50:21 2024 -0700 address more comments from Anish commit 1870b35 Merge: 4d4cd70 9eb6c76 Author: Yuchen Liu <[email protected]> Date: Thu Jun 13 14:25:23 2024 -0700 Merge branch 'skipSnapshotAtBatch' of https://github.com/eason-yuchen-liu/spark into skipSnapshotAtBatch commit 4d4cd70 Author: Yuchen Liu <[email protected]> Date: Thu Jun 13 14:24:55 2024 -0700 log StateSourceOptions optionally commit 9eb6c76 Merge: 20e1b9c 08e741b Author: Yuchen Liu <[email protected]> Date: Thu Jun 13 14:18:50 2024 -0700 Merge branch 'master' into skipSnapshotAtBatch commit 20e1b9c Author: Yuchen Liu <[email protected]> Date: Thu Jun 13 14:16:14 2024 -0700 address comments from Anish & Wei commit 4825215 Author: Yuchen Liu <[email protected]> Date: Thu Jun 13 11:45:55 2024 -0700 address reviews by Wei partially commit 5229152 Author: Yuchen Liu <[email protected]> Date: Wed Jun 12 11:29:46 2024 -0700 support reading join states commit 61dea35 Author: Yuchen Liu <[email protected]> Date: Tue Jun 11 13:16:56 2024 -0700 minor commit 1656580 Author: Yuchen Liu <[email protected]> Date: Tue Jun 11 12:07:06 2024 -0700 improve doc commit 4ebd078 Author: Yuchen Liu <[email protected]> Date: Tue Jun 11 11:48:30 2024 -0700 move partition error commit dfa712e Author: Yuchen Liu <[email protected]> Date: Tue Jun 11 11:42:09 2024 -0700 clean up and format commit aa337c1 Author: Yuchen Liu <[email protected]> Date: Tue Jun 11 10:22:59 2024 -0700 add new test on partition not found error commit 292ec5d Author: Yuchen Liu <[email protected]> Date: Mon Jun 10 16:54:38 2024 -0700 delete useless test files commit 1a3d20a Author: Yuchen Liu <[email protected]> Date: Mon Jun 10 16:52:22 2024 -0700 make sure test is stable commit eddb3c7 Merge: 9d902d7 5a2f374 Author: Yuchen Liu <[email protected]> Date: Mon Jun 10 11:43:03 2024 -0700 Merge branch 'apache:master' into skipSnapshotAtBatch commit 9d902d7 Author: Yuchen Liu <[email protected]> Date: Mon Jun 10 11:13:02 2024 -0700 test directly on the method instead of end to end commit 07267b5 Author: Yuchen Liu <[email protected]> Date: Fri Jun 7 16:43:45 2024 -0700 allow rocksdb to reconstruct state from a specific checkpoint commit 2475173 Author: Yuchen Liu <[email protected]> Date: Thu Jun 6 10:32:56 2024 -0700 add test cases for two options in HDFS state store commit 7dad0c1 Merge: 6db0e3d 8a0927c Author: Yuchen Liu <[email protected]> Date: Tue Jun 4 15:30:20 2024 -0700 Merge branch 'skipSnapshotAtBatch' of https://github.com/eason-yuchen-liu/spark into skipSnapshotAtBatch commit 6db0e3d Author: Yuchen Liu <[email protected]> Date: Tue Jun 4 15:28:49 2024 -0700 initial implementation
.add("value", valueSchema) | ||
.add("partition_id", IntegerType) | ||
if (sourceOptions.readChangeFeed) { | ||
new StructType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd expect change_type
and batch_id
to begin with, and even batch ID to be placed earlier (batch_id, change_type).
Given the characteristic of change feed, the output is expected to be ordered by batch ID (among partition IDs, which may be uneasy), or even the data source does not do so, users should be able to do so easily because they will high likely do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Show resolved
Hide resolved
@@ -132,7 +143,10 @@ case class StateSourceOptions( | |||
storeName: String, | |||
joinSide: JoinSideValues, | |||
snapshotStartBatchId: Option[Long], | |||
snapshotPartitionId: Option[Int]) { | |||
snapshotPartitionId: Option[Int], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we are here, it'd be nice to structure sub-options as the parameters are now 10 and 5 params aren't common ones. Options for 1) starting with snapshot 2) readChangeFeed can be grouped together and be Option[<option model class>]
for each.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
if (changeStartBatchId.isEmpty) { | ||
throw StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID) | ||
} | ||
changeEndBatchId = Option(changeEndBatchId.getOrElse(batchId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we'll need to make clear the current option batchId
to denote that it is "ending" batch ID - it will help the option to be used among multiple modes.
We could probably design a new option and promote the new option later. Before that, let's simply not fall back - let's require users to specify symmetric option. We could reconsider the option of consolidating "starting batch ID" as well later.
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Outdated
Show resolved
Hide resolved
* @param endVersion | ||
* @return | ||
*/ | ||
def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly saying, 3rd party state store providers can implement their own format of delta/changelog files. We need to define an interface for change data reader, and have a built-in implementation of the interface which works for both HDFS and RocksDB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think users can extend StateStoreChangeDataReader
in StateStoreChangelog.scala to implement their own change data reader. There are built-in implementation examples in both providers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But StateStoreChangeDataReader is only helpful when they have very similar implementation with the built-in one, right? If they have totally different approach of maintaining changelog, they are going to reimplement everything and it is not clear what needs to be implemented.
An interface is to declare the spec. Whenever we design pluggable functionality, please be sure to define the spec and describe the spec as interface. Don't make others struggle with understanding spec from actual implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point now. I would say to make the interface easier for users to implement, why don't we use the superclass of StateStoreChangeDataReader
which is NextIterator
as the interface. It has two well defined extendible method getNext
and close
and it is also stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry one last small suggestion - Could we please put the information of tuple in the method doc? Now it's not self-explained.
@@ -55,6 +56,15 @@ object RecordType extends Enumeration { | |||
} | |||
} | |||
|
|||
def getRecordTypeAsString(recordType: RecordType): String = { | |||
recordType match { | |||
case PUT_RECORD => "update" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MERGE_RECORD is a valid one - we just need to add more type to state data source reader.
We'd also like to add a test, but I guess we are yet to address integration for transformWithState and State data source reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge is only used in changlog v2. Since this PR only aims for v1, the support of merge should be done later.
finished = true | ||
return null | ||
} | ||
// Todo: Does not support StateStoreChangelogReaderV2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that we do not have an information to distinguish whether this needs to use V1 vs V2. Do I understand correctly? Since TWS support in state data source reader isn't done yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anishshri-db Would SPARK-47047 aim to solve this? If then we can put SPARK-47047 and mark SPARK-47047 as a blocker. (Technically every remaining tickets in transformWithState are blockers for Spark 4.0.0) If it's not aiming to solve this TODO, @eason-yuchen-liu please file a JIRA ticket and put JIRA ticket number.
...org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
Outdated
Show resolved
Hide resolved
@@ -76,14 +82,31 @@ class StateTable( | |||
override def properties(): util.Map[String, String] = Map.empty[String, String].asJava | |||
|
|||
private def isValidSchema(schema: StructType): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My proposal could handle both non-CDF and CDF altogether in the single flow - this still needs a divergence and also every column has its own if or else if. Have you tried with my proposal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry overlooked the code. It is indeed more elegant. Thanks for the suggestion.
…der & change the existing StateStoreChangelogReader to correctly implement the interface
…/spark into readStateChange
@@ -260,7 +270,7 @@ abstract class StateStoreChangelogReader( | |||
} | |||
protected val input: DataInputStream = decompressStream(sourceStream) | |||
|
|||
def close(): Unit = { if (input != null) input.close() } | |||
override protected def close(): Unit = { if (input != null) input.close() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is to make sure StateStoreChangelogReader
correctly implements the interface of NextIterator
, method closeIfNeeded
should be used instead of close
.
@@ -403,7 +403,7 @@ class StateStoreChangelogReaderV2( | |||
|
|||
/** | |||
* Base class representing a iterator that iterates over a range of changelog files in a state | |||
* store. In each iteration, it will return a tuple of (changeType: [[RecordType]], | |||
* store. In each iteration, it will return a ByteArrayPair of (changeType: [[RecordType]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this needs to be also available to the interface method doc.
@@ -262,7 +262,7 @@ object StateSourceOptions extends DataSourceOptions { | |||
if (changeStartBatchId.isEmpty) { | |||
throw StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID) | |||
} | |||
changeEndBatchId = Option( | |||
changeEndBatchId = Some( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change from Option to Some to pass the test concerns me. The difference only happens when we could get null value, and if we perform get in Some(null) this would be problematic anyway.
@eason-yuchen-liu Could you please explain the rationale of the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry for the confusion. I use Some because it can more clearly show that some Option variables are assigned a value. The only relevant change in this commit to pass test is at line 302.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI
https://github.com/eason-yuchen-liu/spark/actions/runs/9867632456/job/27250015470 Looks like polling the result from fork seems to be delayed. The GA build for the last commit succeeded in the fork. |
Thanks! Merging to master. |
### What changes were proposed in this pull request? This PR adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: ``` .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available ``` _Note that this mode does not support the option "joinSide"._ ### Why are the changes needed? The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Adds a new test suite `StateDataSourceChangeDataReadSuite` that includes 1) testing input error 2) testing new API added 3) integration test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47188 from eason-yuchen-liu/readStateChange. Lead-authored-by: Yuchen Liu <[email protected]> Co-authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
This PR adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: ``` .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available ``` _Note that this mode does not support the option "joinSide"._ The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. No. Adds a new test suite `StateDataSourceChangeDataReadSuite` that includes 1) testing input error 2) testing new API added 3) integration test. No. Closes apache#47188 from eason-yuchen-liu/readStateChange. Lead-authored-by: Yuchen Liu <[email protected]> Co-authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In #46944 and #47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
This PR adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: ``` .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available ``` _Note that this mode does not support the option "joinSide"._ The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. No. Adds a new test suite `StateDataSourceChangeDataReadSuite` that includes 1) testing input error 2) testing new API added 3) integration test. No. Closes apache#47188 from eason-yuchen-liu/readStateChange. Lead-authored-by: Yuchen Liu <[email protected]> Co-authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
### What changes were proposed in this pull request? This PR adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: ``` .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available ``` _Note that this mode does not support the option "joinSide"._ ### Why are the changes needed? The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Adds a new test suite `StateDataSourceChangeDataReadSuite` that includes 1) testing input error 2) testing new API added 3) integration test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47188 from eason-yuchen-liu/readStateChange. Lead-authored-by: Yuchen Liu <[email protected]> Co-authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
### What changes were proposed in this pull request? This PR adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source. An example usage: ``` .format("statestore") .option("readChangeFeed", true) .option("changeStartBatchId", 5) #required .option("changeEndBatchId", 10) #not required, default: latest batch Id available ``` _Note that this mode does not support the option "joinSide"._ ### Why are the changes needed? The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Adds a new test suite `StateDataSourceChangeDataReadSuite` that includes 1) testing input error 2) testing new API added 3) integration test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47188 from eason-yuchen-liu/readStateChange. Lead-authored-by: Yuchen Liu <[email protected]> Co-authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…to State Data Source ### What changes were proposed in this pull request? In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation. ### Why are the changes needed? It is necessary to reflect the latest change in the documentation website. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The API Doc website can be rendered correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47274 from eason-yuchen-liu/snapshot-doc. Authored-by: Yuchen Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
What changes were proposed in this pull request?
This PR adds ability of showing the evolution of state as Change Data Capture (CDC) format to state data source.
An example usage:
Note that this mode does not support the option "joinSide".
Why are the changes needed?
The current state reader can only return the entire state at a specific version. If an error occurs related to state, knowing the change of state across versions to find out at which version state starts to go wrong is important for debugging purposes.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Adds a new test suite
StateDataSourceChangeDataReadSuite
that includes 1) testing input error 2) testing new API added 3) integration test.Was this patch authored or co-authored using generative AI tooling?
No.