Skip to content

Commit

Permalink
make sure StateStoreChangeData is used everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 26, 2024
1 parent ff5bff2 commit 271b98e
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class StatePartitionReaderFactory(
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
val stateStoreInputPartition = partition.asInstanceOf[StateStoreInputPartition]
if (stateStoreInputPartition.sourceOptions.modeType == StateDataSourceModeType.CDC) {
new StateCDCPartitionReader(storeConf, hadoopConf,
new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
stateStoreInputPartition, schema)
} else {
new StatePartitionReader(storeConf, hadoopConf,
Expand Down Expand Up @@ -151,16 +151,17 @@ class StatePartitionReader(
}
}

class StateCDCPartitionReader(
class StateStoreChangeDataPartitionReader(
storeConf: StateStoreConf,
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType) extends StatePartitionReader(storeConf, hadoopConf, partition, schema) {

private lazy val cdcReader: StateChangeDataReader = {
provider.getStateChangeDataReader(
partition.sourceOptions.cdcStartBatchID.get + 1,
partition.sourceOptions.cdcEndBatchId.get + 1)
private lazy val cdcReader: StateStoreChangeDataReader = {
provider.asInstanceOf[SupportsStateStoreChangeDataFeed]
.getStateStoreChangeDataReader(
partition.sourceOptions.cdcStartBatchID.get + 1,
partition.sourceOptions.cdcEndBatchId.get + 1)
}

override protected lazy val iter: Iterator[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ import org.apache.spark.util.ArrayImplicits._
* store.
*/
private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with Logging
with SupportsFineGrainedReplayFromSnapshot {
with SupportsFineGrainedReplayFromSnapshot with SupportsStateStoreChangeDataFeed {

private val providerName = "HDFSBackedStateStoreProvider"

Expand Down Expand Up @@ -976,7 +976,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
result
}

override def getStateChangeDataReader(startVersion: Long, endVersion: Long): StateChangeDataReader = {
override def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
StateStoreChangeDataReader = {
new HDFSBackedStateStoreCDCReader(fm, baseDir, startVersion, endVersion,
CompressionCodec.createCodec(sparkConf, storeConf.compressionCodec),
keySchema, valueSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.Utils

private[sql] class RocksDBStateStoreProvider
extends StateStoreProvider with Logging with Closeable
with SupportsFineGrainedReplayFromSnapshot {
with SupportsFineGrainedReplayFromSnapshot with SupportsStateStoreChangeDataFeed {
import RocksDBStateStoreProvider._

class RocksDBStateStore(lastVersion: Long) extends StateStore {
Expand Down Expand Up @@ -403,7 +403,8 @@ private[sql] class RocksDBStateStoreProvider
}
}

override def getStateChangeDataReader(startVersion: Long, endVersion: Long): StateChangeDataReader = {
override def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
StateStoreChangeDataReader = {
val statePath = stateStoreId.storeCheckpointLocation()
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
new RocksDBStateStoreCDCReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import org.apache.spark.util.NextIterator
* This is an optional trait for [[StateStoreProvider]]s to mix in if they support reading state
* change data. It is used by the readChangeFeed option of State Data Source.
*/
trait SupportsStateChangeDataFeed {
def getStateChangeDataReader(startVersion: Long, endVersion: Long): StateChangeDataReader
trait SupportsStateStoreChangeDataFeed {
def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
StateStoreChangeDataReader
}

/**
Expand All @@ -40,7 +41,7 @@ trait SupportsStateChangeDataFeed {
* @param fileToRead - name of file to use to read changelog
* @param compressionCodec - de-compression method using for reading changelog file
*/
abstract class StateChangeDataReader(
abstract class StateStoreChangeDataReader(
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
Expand Down Expand Up @@ -93,7 +94,7 @@ class HDFSBackedStateStoreCDCReader(
keySchema: StructType,
valueSchema: StructType
)
extends StateChangeDataReader(
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec) {
override protected var changelogSuffix: String = "delta"

Expand Down Expand Up @@ -140,7 +141,7 @@ class RocksDBStateStoreCDCReader(
keySchema: StructType,
valueSchema: StructType
)
extends StateChangeDataReader(
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec) {
override protected var changelogSuffix: String = "changelog"

Expand Down

0 comments on commit 271b98e

Please sign in to comment.