Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48772][SS][SQL] State Data Source Change Feed Reader Mode #47188

Closed
Closed
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1ade442
Squashed commit of the following:
eason-yuchen-liu Jul 2, 2024
98bf8ec
revert unnecessary changes
eason-yuchen-liu Jul 2, 2024
fb890ae
Merge branch 'master' into readStateChange
eason-yuchen-liu Jul 2, 2024
db45c6f
Add comments
eason-yuchen-liu Jul 2, 2024
1926e5e
Merge branch 'readStateChange' of https://github.com/eason-yuchen-liu…
eason-yuchen-liu Jul 2, 2024
24c0351
minor
eason-yuchen-liu Jul 2, 2024
d4a4b80
group options & make options in changeFeed mode isolate from some opt…
eason-yuchen-liu Jul 8, 2024
42552ac
reorder the columns in the result
eason-yuchen-liu Jul 8, 2024
24db837
address comments from Jungtaek
eason-yuchen-liu Jul 8, 2024
adde991
minor
eason-yuchen-liu Jul 8, 2024
d3ca86c
refactor StatePartitionReader for both modes
eason-yuchen-liu Jul 8, 2024
5199c56
minor
eason-yuchen-liu Jul 8, 2024
ce75133
Use NextIterator as the interface rather than StateStoreChangeDataRea…
eason-yuchen-liu Jul 9, 2024
84dcf15
more doc
eason-yuchen-liu Jul 9, 2024
22a086b
Merge branch 'master' into readStateChange
eason-yuchen-liu Jul 9, 2024
c797d0b
use Jungtaek's advice in checking schema validity
eason-yuchen-liu Jul 9, 2024
5921479
Merge branch 'readStateChange' of https://github.com/eason-yuchen-liu…
eason-yuchen-liu Jul 9, 2024
e5674cf
solve column family merge conflict
eason-yuchen-liu Jul 9, 2024
c012e1a
pass tests
eason-yuchen-liu Jul 9, 2024
ff0cd43
continue
eason-yuchen-liu Jul 9, 2024
2ad7590
make the doc consistent
eason-yuchen-liu Jul 10, 2024
43420f6
continue
eason-yuchen-liu Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ object StateSourceOptions extends DataSourceOptions {
if (changeStartBatchId.isEmpty) {
throw StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID)
}
changeEndBatchId = Option(
changeEndBatchId = Some(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change from Option to Some to pass the test concerns me. The difference only happens when we could get null value, and if we perform get in Some(null) this would be problematic anyway.
@eason-yuchen-liu Could you please explain the rationale of the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry for the confusion. I use Some because it can more clearly show that some Option variables are assigned a value. The only relevant change in this commit to pass test is at line 302.

changeEndBatchId.getOrElse(getLastCommittedBatch(sparkSession, resolvedCpLocation)))

// changeStartBatchId and changeEndBatchId must all be defined at this point
Expand All @@ -276,7 +276,7 @@ object StateSourceOptions extends DataSourceOptions {
s"value, make sure that $CHANGE_START_BATCH_ID is less than ${changeEndBatchId.get}.")
}

batchId = Option(changeEndBatchId.get)
batchId = Some(changeEndBatchId.get)

readChangeFeedOptions = Option(
ReadChangeFeedOptions(changeStartBatchId.get, changeEndBatchId.get))
Expand All @@ -290,7 +290,7 @@ object StateSourceOptions extends DataSourceOptions {
s"Only specify this option when $READ_CHANGE_FEED is set to true.")
}

batchId = Option(batchId.getOrElse(getLastCommittedBatch(sparkSession, resolvedCpLocation)))
batchId = Some(batchId.getOrElse(getLastCommittedBatch(sparkSession, resolvedCpLocation)))

if (batchId.get < 0) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(BATCH_ID)
Expand All @@ -299,7 +299,7 @@ object StateSourceOptions extends DataSourceOptions {
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID)
} else if (snapshotStartBatchId.exists(_ > batchId.get)) {
throw StateDataSourceErrors.invalidOptionValue(
SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to $batchId")
SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to ${batchId.get}")
}
if (snapshotPartitionId.exists(_ < 0)) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)
Expand All @@ -313,7 +313,7 @@ object StateSourceOptions extends DataSourceOptions {
}

if (snapshotStartBatchId.isDefined && snapshotPartitionId.isDefined) {
fromSnapshotOptions = Option(
fromSnapshotOptions = Some(
FromSnapshotOptions(snapshotStartBatchId.get, snapshotPartitionId.get))
}
}
Expand Down