Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48772][SS][SQL] State Data Source Change Feed Reader Mode #47188

Closed
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1ade442
Squashed commit of the following:
eason-yuchen-liu Jul 2, 2024
98bf8ec
revert unnecessary changes
eason-yuchen-liu Jul 2, 2024
fb890ae
Merge branch 'master' into readStateChange
eason-yuchen-liu Jul 2, 2024
db45c6f
Add comments
eason-yuchen-liu Jul 2, 2024
1926e5e
Merge branch 'readStateChange' of https://github.com/eason-yuchen-liu…
eason-yuchen-liu Jul 2, 2024
24c0351
minor
eason-yuchen-liu Jul 2, 2024
d4a4b80
group options & make options in changeFeed mode isolate from some opt…
eason-yuchen-liu Jul 8, 2024
42552ac
reorder the columns in the result
eason-yuchen-liu Jul 8, 2024
24db837
address comments from Jungtaek
eason-yuchen-liu Jul 8, 2024
adde991
minor
eason-yuchen-liu Jul 8, 2024
d3ca86c
refactor StatePartitionReader for both modes
eason-yuchen-liu Jul 8, 2024
5199c56
minor
eason-yuchen-liu Jul 8, 2024
ce75133
Use NextIterator as the interface rather than StateStoreChangeDataRea…
eason-yuchen-liu Jul 9, 2024
84dcf15
more doc
eason-yuchen-liu Jul 9, 2024
22a086b
Merge branch 'master' into readStateChange
eason-yuchen-liu Jul 9, 2024
c797d0b
use Jungtaek's advice in checking schema validity
eason-yuchen-liu Jul 9, 2024
5921479
Merge branch 'readStateChange' of https://github.com/eason-yuchen-liu…
eason-yuchen-liu Jul 9, 2024
e5674cf
solve column family merge conflict
eason-yuchen-liu Jul 9, 2024
c012e1a
pass tests
eason-yuchen-liu Jul 9, 2024
ff0cd43
continue
eason-yuchen-liu Jul 9, 2024
2ad7590
make the doc consistent
eason-yuchen-liu Jul 10, 2024
43420f6
continue
eason-yuchen-liu Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3812,7 +3812,7 @@
"STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY" : {
"message" : [
"The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.SupportsFineGrainedReplay.",
"Therefore, it does not support option snapshotStartBatchId in state data source."
"Therefore, it does not support option snapshotStartBatchId or readChangeFeed in state data source."
],
"sqlState" : "42K06"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DI
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide}
import org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker, StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -94,10 +94,20 @@ class StateDataSource extends TableProvider with DataSourceRegister {
manager.readSchemaFile()
}

new StructType()
.add("key", keySchema)
.add("value", valueSchema)
.add("partition_id", IntegerType)
if (sourceOptions.readChangeFeed) {
new StructType()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect change_type and batch_id to begin with, and even batch ID to be placed earlier (batch_id, change_type).

Given the characteristic of change feed, the output is expected to be ordered by batch ID (among partition IDs, which may be uneasy), or even the data source does not do so, users should be able to do so easily because they will high likely do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

.add("batch_id", LongType)
.add("change_type", StringType)
.add("key", keySchema)
.add("value", valueSchema)
.add("partition_id", IntegerType)
} else {
new StructType()
.add("key", keySchema)
.add("value", valueSchema)
.add("partition_id", IntegerType)
}

} catch {
case NonFatal(e) =>
throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e)
Expand Down Expand Up @@ -125,21 +135,38 @@ class StateDataSource extends TableProvider with DataSourceRegister {
override def supportsExternalMetadata(): Boolean = false
}

case class FromSnapshotOptions(
snapshotStartBatchId: Long,
snapshotPartitionId: Int)

case class ReadChangeFeedOptions(
changeStartBatchId: Long,
changeEndBatchId: Long
)

case class StateSourceOptions(
resolvedCpLocation: String,
batchId: Long,
operatorId: Int,
storeName: String,
joinSide: JoinSideValues,
snapshotStartBatchId: Option[Long],
snapshotPartitionId: Option[Int]) {
readChangeFeed: Boolean,
fromSnapshotOptions: Option[FromSnapshotOptions],
readChangeFeedOptions: Option[ReadChangeFeedOptions]) {
def stateCheckpointLocation: Path = new Path(resolvedCpLocation, DIR_NAME_STATE)

override def toString: String = {
s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, batchId=$batchId, " +
s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " +
s"snapshotStartBatchId=${snapshotStartBatchId.getOrElse("None")}, " +
s"snapshotPartitionId=${snapshotPartitionId.getOrElse("None")})"
var desc = s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, batchId=$batchId, " +
s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide"
if (fromSnapshotOptions.isDefined) {
desc += s", snapshotStartBatchId=${fromSnapshotOptions.get.snapshotStartBatchId}"
desc += s", snapshotPartitionId=${fromSnapshotOptions.get.snapshotPartitionId}"
}
if (readChangeFeedOptions.isDefined) {
desc += s", changeStartBatchId=${readChangeFeedOptions.get.changeStartBatchId}"
desc += s", changeEndBatchId=${readChangeFeedOptions.get.changeEndBatchId}"
}
desc + ")"
}
}

Expand All @@ -151,6 +178,9 @@ object StateSourceOptions extends DataSourceOptions {
val JOIN_SIDE = newOption("joinSide")
val SNAPSHOT_START_BATCH_ID = newOption("snapshotStartBatchId")
val SNAPSHOT_PARTITION_ID = newOption("snapshotPartitionId")
val READ_CHANGE_FEED = newOption("readChangeFeed")
val CHANGE_START_BATCH_ID = newOption("changeStartBatchId")
val CHANGE_END_BATCH_ID = newOption("changeEndBatchId")

object JoinSideValues extends Enumeration {
type JoinSideValues = Value
Expand All @@ -172,16 +202,6 @@ object StateSourceOptions extends DataSourceOptions {
throw StateDataSourceErrors.requiredOptionUnspecified(PATH)
}.get

val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, checkpointLocation)

val batchId = Option(options.get(BATCH_ID)).map(_.toLong).orElse {
Some(getLastCommittedBatch(sparkSession, resolvedCpLocation))
}.get

if (batchId < 0) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(BATCH_ID)
}

val operatorId = Option(options.get(OPERATOR_ID)).map(_.toInt)
.orElse(Some(0)).get

Expand Down Expand Up @@ -210,30 +230,95 @@ object StateSourceOptions extends DataSourceOptions {
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
}

val snapshotStartBatchId = Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong)
if (snapshotStartBatchId.exists(_ < 0)) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID)
} else if (snapshotStartBatchId.exists(_ > batchId)) {
throw StateDataSourceErrors.invalidOptionValue(
SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to $batchId")
}
val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, checkpointLocation)

var batchId = Option(options.get(BATCH_ID)).map(_.toLong)

val snapshotStartBatchId = Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong)
val snapshotPartitionId = Option(options.get(SNAPSHOT_PARTITION_ID)).map(_.toInt)
if (snapshotPartitionId.exists(_ < 0)) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)
}

// both snapshotPartitionId and snapshotStartBatchId are required at the same time, because
// each partition may have different checkpoint status
if (snapshotPartitionId.isDefined && snapshotStartBatchId.isEmpty) {
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_START_BATCH_ID)
} else if (snapshotPartitionId.isEmpty && snapshotStartBatchId.isDefined) {
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID)
val readChangeFeed = Option(options.get(READ_CHANGE_FEED)).exists(_.toBoolean)

val changeStartBatchId = Option(options.get(CHANGE_START_BATCH_ID)).map(_.toLong)
var changeEndBatchId = Option(options.get(CHANGE_END_BATCH_ID)).map(_.toLong)

var fromSnapshotOptions: Option[FromSnapshotOptions] = None
var readChangeFeedOptions: Option[ReadChangeFeedOptions] = None

if (readChangeFeed) {
if (joinSide != JoinSideValues.none) {
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, READ_CHANGE_FEED))
}
if (batchId.isDefined) {
throw StateDataSourceErrors.conflictOptions(Seq(BATCH_ID, READ_CHANGE_FEED))
}
if (snapshotStartBatchId.isDefined) {
throw StateDataSourceErrors.conflictOptions(Seq(SNAPSHOT_START_BATCH_ID, READ_CHANGE_FEED))
}
if (snapshotPartitionId.isDefined) {
throw StateDataSourceErrors.conflictOptions(Seq(SNAPSHOT_PARTITION_ID, READ_CHANGE_FEED))
}

if (changeStartBatchId.isEmpty) {
throw StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID)
}
changeEndBatchId = Option(
changeEndBatchId.getOrElse(getLastCommittedBatch(sparkSession, resolvedCpLocation)))

// changeStartBatchId and changeEndBatchId must all be defined at this point
if (changeStartBatchId.get < 0) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(CHANGE_START_BATCH_ID)
}
if (changeEndBatchId.get < changeStartBatchId.get) {
throw StateDataSourceErrors.invalidOptionValue(CHANGE_END_BATCH_ID,
s"$CHANGE_END_BATCH_ID cannot be smaller than $CHANGE_START_BATCH_ID. " +
s"Please check the input to $CHANGE_END_BATCH_ID, or if you are using its default " +
s"value, make sure that $CHANGE_START_BATCH_ID is less than ${changeEndBatchId.get}.")
}

batchId = Option(changeEndBatchId.get)

readChangeFeedOptions = Option(
ReadChangeFeedOptions(changeStartBatchId.get, changeEndBatchId.get))
} else {
if (changeStartBatchId.isDefined) {
throw StateDataSourceErrors.invalidOptionValue(CHANGE_START_BATCH_ID,
s"Only specify this option when $READ_CHANGE_FEED is set to true.")
}
if (changeEndBatchId.isDefined) {
throw StateDataSourceErrors.invalidOptionValue(CHANGE_END_BATCH_ID,
s"Only specify this option when $READ_CHANGE_FEED is set to true.")
}

batchId = Option(batchId.getOrElse(getLastCommittedBatch(sparkSession, resolvedCpLocation)))

if (batchId.get < 0) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(BATCH_ID)
}
if (snapshotStartBatchId.exists(_ < 0)) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID)
} else if (snapshotStartBatchId.exists(_ > batchId.get)) {
throw StateDataSourceErrors.invalidOptionValue(
SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to $batchId")
}
if (snapshotPartitionId.exists(_ < 0)) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)
}
// both snapshotPartitionId and snapshotStartBatchId are required at the same time, because
// each partition may have different checkpoint status
if (snapshotPartitionId.isDefined && snapshotStartBatchId.isEmpty) {
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_START_BATCH_ID)
} else if (snapshotPartitionId.isEmpty && snapshotStartBatchId.isDefined) {
throw StateDataSourceErrors.requiredOptionUnspecified(SNAPSHOT_PARTITION_ID)
}

fromSnapshotOptions = Option(
FromSnapshotOptions(snapshotStartBatchId.get, snapshotPartitionId.get))
}

StateSourceOptions(
resolvedCpLocation, batchId, operatorId, storeName,
joinSide, snapshotStartBatchId, snapshotPartitionId)
resolvedCpLocation, batchId.get, operatorId, storeName, joinSide,
readChangeFeed, fromSnapshotOptions, readChangeFeedOptions)
}

private def resolvedCheckpointLocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, Par
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataTableEntry
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.streaming.state.RecordType.{getRecordTypeAsString, RecordType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration

/**
Expand All @@ -37,27 +39,32 @@ class StatePartitionReaderFactory(
stateStoreMetadata: Array[StateMetadataTableEntry]) extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new StatePartitionReader(storeConf, hadoopConf,
partition.asInstanceOf[StateStoreInputPartition], schema, stateStoreMetadata)
val stateStoreInputPartition = partition.asInstanceOf[StateStoreInputPartition]
if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
stateStoreInputPartition, schema, stateStoreMetadata)
} else {
new StatePartitionReader(storeConf, hadoopConf,
stateStoreInputPartition, schema, stateStoreMetadata)
}
}
}

/**
* An implementation of [[PartitionReader]] for State data source. This is used to support
* general read from a state store instance, rather than specific to the operator.
*/
class StatePartitionReader(
abstract class StatePartitionReaderBase(
storeConf: StateStoreConf,
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType,
stateStoreMetadata: Array[StateMetadataTableEntry])
extends PartitionReader[InternalRow] with Logging {

private val keySchema = SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, "value").asInstanceOf[StructType]

private lazy val provider: StateStoreProvider = {
protected lazy val provider: StateStoreProvider = {
val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
Expand Down Expand Up @@ -88,25 +95,7 @@ class StatePartitionReader(
useMultipleValuesPerKey = false)
}

private lazy val store: ReadStateStore = {
partition.sourceOptions.snapshotStartBatchId match {
case None => provider.getReadStore(partition.sourceOptions.batchId + 1)

case Some(snapshotStartBatchId) =>
if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
provider.getClass.toString)
}
provider.asInstanceOf[SupportsFineGrainedReplay]
.replayReadStateFromSnapshot(
snapshotStartBatchId + 1,
partition.sourceOptions.batchId + 1)
}
}

private lazy val iter: Iterator[InternalRow] = {
store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
}
protected val iter: Iterator[InternalRow]

private var current: InternalRow = _

Expand All @@ -124,9 +113,46 @@ class StatePartitionReader(

override def close(): Unit = {
current = null
store.abort()
provider.close()
}
}

/**
* An implementation of [[StatePartitionReaderBase]] for the normal mode of State Data
* Source. It reads the the state at a particular batchId.
*/
class StatePartitionReader(
storeConf: StateStoreConf,
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType,
stateStoreMetadata: Array[StateMetadataTableEntry])
extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema, stateStoreMetadata) {

private lazy val store: ReadStateStore = {
partition.sourceOptions.fromSnapshotOptions match {
case None => provider.getReadStore(partition.sourceOptions.batchId + 1)

case Some(fromSnapshotOptions) =>
if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
provider.getClass.toString)
}
provider.asInstanceOf[SupportsFineGrainedReplay]
.replayReadStateFromSnapshot(
fromSnapshotOptions.snapshotStartBatchId + 1,
partition.sourceOptions.batchId + 1)
}
}

override lazy val iter: Iterator[InternalRow] = {
store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
}

override def close(): Unit = {
store.abort()
super.close()
}

private def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow)): InternalRow = {
val row = new GenericInternalRow(3)
Expand All @@ -136,3 +162,47 @@ class StatePartitionReader(
row
}
}

/**
* An implementation of [[StatePartitionReaderBase]] for the readChangeFeed mode of State Data
* Source. It reads the change of state over batches of a particular partition.
*/
class StateStoreChangeDataPartitionReader(
storeConf: StateStoreConf,
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType,
stateStoreMetadata: Array[StateMetadataTableEntry])
extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema, stateStoreMetadata) {

private lazy val changeDataReader: StateStoreChangeDataReader = {
if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
provider.getClass.toString)
}
provider.asInstanceOf[SupportsFineGrainedReplay]
.getStateStoreChangeDataReader(
partition.sourceOptions.readChangeFeedOptions.get.changeStartBatchId + 1,
partition.sourceOptions.readChangeFeedOptions.get.changeEndBatchId + 1)
}

override lazy val iter: Iterator[InternalRow] = {
changeDataReader.iterator.map(unifyStateChangeDataRow)
}

override def close(): Unit = {
changeDataReader.close()
super.close()
}

private def unifyStateChangeDataRow(row: (RecordType, UnsafeRow, UnsafeRow, Long)):
InternalRow = {
val result = new GenericInternalRow(5)
result.update(0, row._4)
result.update(1, UTF8String.fromString(getRecordTypeAsString(row._1)))
result.update(2, row._2)
result.update(3, row._3)
result.update(4, partition.partition)
result
}
}
Loading