Skip to content

Commit

Permalink
unify the two traits
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 27, 2024
1 parent cd6a39b commit 7c6cdad
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ class StateStoreChangeDataPartitionReader(
schema: StructType) extends StatePartitionReader(storeConf, hadoopConf, partition, schema) {

private lazy val cdcReader: StateStoreChangeDataReader = {
provider.asInstanceOf[SupportsStateStoreChangeDataFeed]
if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
provider.getClass.toString)
}
provider.asInstanceOf[SupportsFineGrainedReplay]
.getStateStoreChangeDataReader(
partition.sourceOptions.cdcStartBatchID.get + 1,
partition.sourceOptions.cdcEndBatchId.get + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,9 @@ object StateStoreProvider {
}

/**
* This is an optional trait to be implemented by [[StateStoreProvider]]s that can read fine
* grained state data which is replayed from a specific snapshot version. It is used by the
* snapshotStartBatchId option in state data source.
* This is an optional trait to be implemented by [[StateStoreProvider]]s that can read the change
* of state store over batches. This is used by State Data Source with additional options like
* snapshotStartBatchId or readChangeFeed.
*/
trait SupportsFineGrainedReplay {
/**
Expand All @@ -469,6 +469,15 @@ trait SupportsFineGrainedReplay {
def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore = {
new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion, endVersion))
}

/**
*
* @param startVersion
* @param endVersion
* @return
*/
def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
StateStoreChangeDataReader
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.NextIterator

/**
* This is an optional trait for [[StateStoreProvider]]s to mix in if they support reading state
* change data. It is used by the readChangeFeed option of State Data Source.
*/
trait SupportsStateStoreChangeDataFeed {
def getStateStoreChangeDataReader(startVersion: Long, endVersion: Long):
StateStoreChangeDataReader
}

/**
* Base class for state store changelog reader
* @param fm - checkpoint file manager used to manage streaming query checkpoint
Expand Down

0 comments on commit 7c6cdad

Please sign in to comment.