Skip to content

Commit

Permalink
separate CDCPartitionReader from StatePartitionReader
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 21, 2024
1 parent bd87055 commit 752cdc7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ class StatePartitionReaderFactory(
schema: StructType) extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new StatePartitionReader(storeConf, hadoopConf,
partition.asInstanceOf[StateStoreInputPartition], schema)
val stateStoreInputPartition = partition.asInstanceOf[StateStoreInputPartition]
if (stateStoreInputPartition.sourceOptions.modeType == StateDataSourceModeType.CDC) {
new StateCDCPartitionReader(storeConf, hadoopConf,
stateStoreInputPartition, schema)
} else {
new StatePartitionReader(storeConf, hadoopConf,
stateStoreInputPartition, schema)
}

}
}

Expand All @@ -57,7 +64,7 @@ class StatePartitionReader(
private val keySchema = SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, "value").asInstanceOf[StructType]

private lazy val provider: StateStoreProvider = {
protected lazy val provider: StateStoreProvider = {
val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
Expand Down Expand Up @@ -104,22 +111,11 @@ class StatePartitionReader(
}
}

private lazy val cdcReader: StateStoreCDCReader = {
provider.getStateStoreCDCReader(
partition.sourceOptions.cdcStartBatchID.get,
partition.sourceOptions.cdcEndBatchId.get)
}

private lazy val iter: Iterator[InternalRow] = {
if (partition.sourceOptions.modeType == StateDataSourceModeType.CDC) {
println("Here!!!!!!")
cdcReader.iterator.map(unifyStateCDCRow)
} else {
store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
}
protected lazy val iter: Iterator[InternalRow] = {
store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
}

private var current: InternalRow = _
protected var current: InternalRow = _

override def next(): Boolean = {
if (iter.hasNext) {
Expand All @@ -136,7 +132,6 @@ class StatePartitionReader(
override def close(): Unit = {
current = null
store.abort()
cdcReader.close()
provider.close()
}

Expand All @@ -147,6 +142,29 @@ class StatePartitionReader(
row.update(2, partition.partition)
row
}
}

class StateCDCPartitionReader(
storeConf: StateStoreConf,
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType) extends StatePartitionReader(storeConf, hadoopConf, partition, schema) {

private lazy val cdcReader: StateStoreCDCReader = {
provider.getStateStoreCDCReader(
partition.sourceOptions.cdcStartBatchID.get + 1,
partition.sourceOptions.cdcEndBatchId.get + 1)
}

override protected lazy val iter: Iterator[InternalRow] = {
cdcReader.iterator.map(unifyStateCDCRow)
}

override def close(): Unit = {
current = null
cdcReader.close()
provider.close()
}

private def unifyStateCDCRow(row: (RecordType, UnsafeRow, UnsafeRow, Long)): InternalRow = {
val result = new GenericInternalRow(5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ abstract class StateStoreCDCReader(

private var currentVersion = startVersion - 1

/**
* returns the version of the return of the latest [[next]] function call
*/
def getVersion: Long = currentVersion

override def hasNext: Boolean = currentVersion < endVersion
Expand Down

0 comments on commit 752cdc7

Please sign in to comment.