-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48772][SS][SQL] State Data Source Change Feed Reader Mode #47188
Changes from all commits
1ade442
98bf8ec
fb890ae
db45c6f
1926e5e
24c0351
d4a4b80
42552ac
24db837
adde991
d3ca86c
5199c56
ce75133
84dcf15
22a086b
c797d0b
5921479
e5674cf
c012e1a
ff0cd43
2ad7590
43420f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DI | |
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide} | ||
import org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker, StateStore, StateStoreConf, StateStoreId, StateStoreProviderId} | ||
import org.apache.spark.sql.sources.DataSourceRegister | ||
import org.apache.spark.sql.types.{IntegerType, StructType} | ||
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
import org.apache.spark.util.SerializableConfiguration | ||
|
||
|
@@ -94,10 +94,20 @@ class StateDataSource extends TableProvider with DataSourceRegister { | |
manager.readSchemaFile() | ||
} | ||
|
||
new StructType() | ||
.add("key", keySchema) | ||
.add("value", valueSchema) | ||
.add("partition_id", IntegerType) | ||
if (sourceOptions.readChangeFeed) { | ||
new StructType() | ||
.add("batch_id", LongType) | ||
.add("change_type", StringType) | ||
.add("key", keySchema) | ||
.add("value", valueSchema) | ||
.add("partition_id", IntegerType) | ||
} else { | ||
new StructType() | ||
.add("key", keySchema) | ||
.add("value", valueSchema) | ||
.add("partition_id", IntegerType) | ||
} | ||
|
||
eason-yuchen-liu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} catch { | ||
case NonFatal(e) => | ||
throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e) | ||
|
@@ -125,21 +135,38 @@ class StateDataSource extends TableProvider with DataSourceRegister { | |
override def supportsExternalMetadata(): Boolean = false | ||
} | ||
|
||
case class FromSnapshotOptions( | ||
snapshotStartBatchId: Long, | ||
snapshotPartitionId: Int) | ||
|
||
case class ReadChangeFeedOptions( | ||
changeStartBatchId: Long, | ||
changeEndBatchId: Long | ||
) | ||
|
||
case class StateSourceOptions( | ||
resolvedCpLocation: String, | ||
batchId: Long, | ||
operatorId: Int, | ||
storeName: String, | ||
joinSide: JoinSideValues, | ||
snapshotStartBatchId: Option[Long], | ||
snapshotPartitionId: Option[Int]) { | ||
readChangeFeed: Boolean, | ||
fromSnapshotOptions: Option[FromSnapshotOptions], | ||
readChangeFeedOptions: Option[ReadChangeFeedOptions]) { | ||
def stateCheckpointLocation: Path = new Path(resolvedCpLocation, DIR_NAME_STATE) | ||
|
||
override def toString: String = { | ||
s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, batchId=$batchId, " + | ||
s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " + | ||
s"snapshotStartBatchId=${snapshotStartBatchId.getOrElse("None")}, " + | ||
s"snapshotPartitionId=${snapshotPartitionId.getOrElse("None")})" | ||
var desc = s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, batchId=$batchId, " + | ||
s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide" | ||
if (fromSnapshotOptions.isDefined) { | ||
desc += s", snapshotStartBatchId=${fromSnapshotOptions.get.snapshotStartBatchId}" | ||
desc += s", snapshotPartitionId=${fromSnapshotOptions.get.snapshotPartitionId}" | ||
} | ||
if (readChangeFeedOptions.isDefined) { | ||
desc += s", changeStartBatchId=${readChangeFeedOptions.get.changeStartBatchId}" | ||
desc += s", changeEndBatchId=${readChangeFeedOptions.get.changeEndBatchId}" | ||
} | ||
desc + ")" | ||
} | ||
} | ||
|
||
|
@@ -151,6 +178,9 @@ object StateSourceOptions extends DataSourceOptions { | |
val JOIN_SIDE = newOption("joinSide") | ||
val SNAPSHOT_START_BATCH_ID = newOption("snapshotStartBatchId") | ||
val SNAPSHOT_PARTITION_ID = newOption("snapshotPartitionId") | ||
val READ_CHANGE_FEED = newOption("readChangeFeed") | ||
val CHANGE_START_BATCH_ID = newOption("changeStartBatchId") | ||
val CHANGE_END_BATCH_ID = newOption("changeEndBatchId") | ||
|
||
object JoinSideValues extends Enumeration { | ||
type JoinSideValues = Value | ||
|
@@ -172,16 +202,6 @@ object StateSourceOptions extends DataSourceOptions { | |
throw StateDataSourceErrors.requiredOptionUnspecified(PATH) | ||
}.get | ||
|
||
val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, checkpointLocation) | ||
|
||
val batchId = Option(options.get(BATCH_ID)).map(_.toLong).orElse { | ||
Some(getLastCommittedBatch(sparkSession, resolvedCpLocation)) | ||
}.get | ||
|
||
if (batchId < 0) { | ||
throw StateDataSourceErrors.invalidOptionValueIsNegative(BATCH_ID) | ||
} | ||
|
||
val operatorId = Option(options.get(OPERATOR_ID)).map(_.toInt) | ||
.orElse(Some(0)).get | ||
|
||
|
@@ -210,30 +230,97 @@ object StateSourceOptions extends DataSourceOptions { | |
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME)) | ||
} | ||
|
||
val snapshotStartBatchId = Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong) | ||
if (snapshotStartBatchId.exists(_ < 0)) { | ||
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID) | ||
} else if (snapshotStartBatchId.exists(_ > batchId)) { | ||
throw StateDataSourceErrors.invalidOptionValue( | ||
SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to $batchId") | ||
} | ||
val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, checkpointLocation) | ||
|
||
var batchId = Option(options.get(BATCH_ID)).map(_.toLong) | ||
|
||
val snapshotStartBatchId = Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong) | ||
val snapshotPartitionId = Option(options.get(SNAPSHOT_PARTITION_ID)).map(_.toInt) | ||
if (snapshotPartitionId.exists(_ < 0)) { | ||
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID) | ||
} | ||
|
||
// both snapshotPartitionId and snapshotStartBatchId are required at the same time, because | ||
// each partition may have different checkpoint status | ||
if (snapshotPartitionId.isDefined && snapshotStartBatchId.isEmpty) { | ||
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_START_BATCH_ID) | ||
} else if (snapshotPartitionId.isEmpty && snapshotStartBatchId.isDefined) { | ||
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID) | ||
val readChangeFeed = Option(options.get(READ_CHANGE_FEED)).exists(_.toBoolean) | ||
|
||
val changeStartBatchId = Option(options.get(CHANGE_START_BATCH_ID)).map(_.toLong) | ||
var changeEndBatchId = Option(options.get(CHANGE_END_BATCH_ID)).map(_.toLong) | ||
|
||
var fromSnapshotOptions: Option[FromSnapshotOptions] = None | ||
var readChangeFeedOptions: Option[ReadChangeFeedOptions] = None | ||
|
||
if (readChangeFeed) { | ||
if (joinSide != JoinSideValues.none) { | ||
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, READ_CHANGE_FEED)) | ||
} | ||
if (batchId.isDefined) { | ||
throw StateDataSourceErrors.conflictOptions(Seq(BATCH_ID, READ_CHANGE_FEED)) | ||
} | ||
if (snapshotStartBatchId.isDefined) { | ||
throw StateDataSourceErrors.conflictOptions(Seq(SNAPSHOT_START_BATCH_ID, READ_CHANGE_FEED)) | ||
} | ||
if (snapshotPartitionId.isDefined) { | ||
throw StateDataSourceErrors.conflictOptions(Seq(SNAPSHOT_PARTITION_ID, READ_CHANGE_FEED)) | ||
} | ||
|
||
if (changeStartBatchId.isEmpty) { | ||
throw StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID) | ||
} | ||
changeEndBatchId = Some( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The change from Option to Some to pass the test concerns me. The difference only happens when we could get null value, and if we perform get in Some(null) this would be problematic anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh sorry for the confusion. I use Some because it can more clearly show that some Option variables are assigned a value. The only relevant change in this commit to pass test is at line 302. |
||
changeEndBatchId.getOrElse(getLastCommittedBatch(sparkSession, resolvedCpLocation))) | ||
|
||
// changeStartBatchId and changeEndBatchId must all be defined at this point | ||
if (changeStartBatchId.get < 0) { | ||
throw StateDataSourceErrors.invalidOptionValueIsNegative(CHANGE_START_BATCH_ID) | ||
} | ||
if (changeEndBatchId.get < changeStartBatchId.get) { | ||
throw StateDataSourceErrors.invalidOptionValue(CHANGE_END_BATCH_ID, | ||
s"$CHANGE_END_BATCH_ID cannot be smaller than $CHANGE_START_BATCH_ID. " + | ||
s"Please check the input to $CHANGE_END_BATCH_ID, or if you are using its default " + | ||
s"value, make sure that $CHANGE_START_BATCH_ID is less than ${changeEndBatchId.get}.") | ||
} | ||
|
||
batchId = Some(changeEndBatchId.get) | ||
|
||
readChangeFeedOptions = Option( | ||
ReadChangeFeedOptions(changeStartBatchId.get, changeEndBatchId.get)) | ||
} else { | ||
if (changeStartBatchId.isDefined) { | ||
throw StateDataSourceErrors.invalidOptionValue(CHANGE_START_BATCH_ID, | ||
s"Only specify this option when $READ_CHANGE_FEED is set to true.") | ||
} | ||
if (changeEndBatchId.isDefined) { | ||
throw StateDataSourceErrors.invalidOptionValue(CHANGE_END_BATCH_ID, | ||
s"Only specify this option when $READ_CHANGE_FEED is set to true.") | ||
} | ||
|
||
batchId = Some(batchId.getOrElse(getLastCommittedBatch(sparkSession, resolvedCpLocation))) | ||
|
||
if (batchId.get < 0) { | ||
throw StateDataSourceErrors.invalidOptionValueIsNegative(BATCH_ID) | ||
} | ||
if (snapshotStartBatchId.exists(_ < 0)) { | ||
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID) | ||
} else if (snapshotStartBatchId.exists(_ > batchId.get)) { | ||
throw StateDataSourceErrors.invalidOptionValue( | ||
SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to ${batchId.get}") | ||
} | ||
if (snapshotPartitionId.exists(_ < 0)) { | ||
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID) | ||
} | ||
// both snapshotPartitionId and snapshotStartBatchId are required at the same time, because | ||
// each partition may have different checkpoint status | ||
if (snapshotPartitionId.isDefined && snapshotStartBatchId.isEmpty) { | ||
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_START_BATCH_ID) | ||
} else if (snapshotPartitionId.isEmpty && snapshotStartBatchId.isDefined) { | ||
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID) | ||
} | ||
|
||
if (snapshotStartBatchId.isDefined && snapshotPartitionId.isDefined) { | ||
fromSnapshotOptions = Some( | ||
FromSnapshotOptions(snapshotStartBatchId.get, snapshotPartitionId.get)) | ||
} | ||
} | ||
|
||
StateSourceOptions( | ||
resolvedCpLocation, batchId, operatorId, storeName, | ||
joinSide, snapshotStartBatchId, snapshotPartitionId) | ||
resolvedCpLocation, batchId.get, operatorId, storeName, joinSide, | ||
readChangeFeed, fromSnapshotOptions, readChangeFeedOptions) | ||
} | ||
|
||
private def resolvedCheckpointLocation( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd expect
change_type
andbatch_id
to begin with, and even batch ID to be placed earlier (batch_id, change_type).Given the characteristic of change feed, the output is expected to be ordered by batch ID (among partition IDs, which may be uneasy), or even the data source does not do so, users should be able to do so easily because they will high likely do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.