Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48772][SS][SQL] State Data Source Change Feed Reader Mode #47188

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1ade442
Squashed commit of the following:
eason-yuchen-liu Jul 2, 2024
98bf8ec
revert unnecessary changes
eason-yuchen-liu Jul 2, 2024
fb890ae
Merge branch 'master' into readStateChange
eason-yuchen-liu Jul 2, 2024
db45c6f
Add comments
eason-yuchen-liu Jul 2, 2024
1926e5e
Merge branch 'readStateChange' of https://github.com/eason-yuchen-liu…
eason-yuchen-liu Jul 2, 2024
24c0351
minor
eason-yuchen-liu Jul 2, 2024
d4a4b80
group options & make options in changeFeed mode isolate from some opt…
eason-yuchen-liu Jul 8, 2024
42552ac
reorder the columns in the result
eason-yuchen-liu Jul 8, 2024
24db837
address comments from Jungtaek
eason-yuchen-liu Jul 8, 2024
adde991
minor
eason-yuchen-liu Jul 8, 2024
d3ca86c
refactor StatePartitionReader for both modes
eason-yuchen-liu Jul 8, 2024
5199c56
minor
eason-yuchen-liu Jul 8, 2024
ce75133
Use NextIterator as the interface rather than StateStoreChangeDataRea…
eason-yuchen-liu Jul 9, 2024
84dcf15
more doc
eason-yuchen-liu Jul 9, 2024
22a086b
Merge branch 'master' into readStateChange
eason-yuchen-liu Jul 9, 2024
c797d0b
use Jungtaek's advice in checking schema validity
eason-yuchen-liu Jul 9, 2024
5921479
Merge branch 'readStateChange' of https://github.com/eason-yuchen-liu…
eason-yuchen-liu Jul 9, 2024
e5674cf
solve column family merge conflict
eason-yuchen-liu Jul 9, 2024
c012e1a
pass tests
eason-yuchen-liu Jul 9, 2024
ff0cd43
continue
eason-yuchen-liu Jul 9, 2024
2ad7590
make the doc consistent
eason-yuchen-liu Jul 10, 2024
43420f6
continue
eason-yuchen-liu Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,17 @@ trait SupportsFineGrainedReplay {
}

/**
* Return a [[StateStoreChangeDataReader]] that reads the changelogs entries from startVersion to
* Return an iterator that reads all the entries of changelogs from startVersion to
* endVersion.
* Each record is represented by a tuple of (recordType: [[RecordType.Value]], key: [[UnsafeRow]],
* value: [[UnsafeRow]], batchId: [[Long]])
* A put record is returned as a tuple(recordType, key, value, batchId)
* A delete record is return as a tuple(recordType, key, null, batchId)
*
* @param startVersion starting changelog version
* @param endVersion ending changelog version
* @return
* @return tuple(recordType: [[RecordType.Value]], nested key: [[UnsafeRow]],
* nested value: [[UnsafeRow]], batchId: [[Long]])
*/
def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly saying, 3rd party state store providers can implement their own format of delta/changelog files. We need to define an interface for change data reader, and have a built-in implementation of the interface which works for both HDFS and RocksDB.

Copy link
Contributor Author

@eason-yuchen-liu eason-yuchen-liu Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think users can extend StateStoreChangeDataReader in StateStoreChangelog.scala to implement their own change data reader. There are built-in implementation examples in both providers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But StateStoreChangeDataReader is only helpful when they have very similar implementation with the built-in one, right? If they have totally different approach of maintaining changelog, they are going to reimplement everything and it is not clear what needs to be implemented.

An interface is to declare the spec. Whenever we design pluggable functionality, please be sure to define the spec and describe the spec as interface. Don't make others struggle with understanding spec from actual implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point now. I would say to make the interface easier for users to implement, why don't we use the superclass of StateStoreChangeDataReader which is NextIterator as the interface. It has two well defined extendible method getNext and close and it is also stable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry one last small suggestion - Could we please put the information of tuple in the method doc? Now it's not self-explained.

NextIterator[(RecordType.Value, UnsafeRow, UnsafeRow, Long)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class StateStoreChangelogReaderV2(

/**
* Base class representing a iterator that iterates over a range of changelog files in a state
* store. In each iteration, it will return a tuple of (changeType: [[RecordType]],
* store. In each iteration, it will return a ByteArrayPair of (changeType: [[RecordType]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this needs to be also available to the interface method doc.

* nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]])
*
* @param fm checkpoint file manager used to manage streaming query checkpoint
Expand Down