Skip to content

Commit

Permalink
refactor StateStoreChangeDataReader
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 28, 2024
1 parent b1eb8c4 commit 15a8316
Showing 1 changed file with 80 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,53 +29,83 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.NextIterator

/**
* Base class for state store changelog reader
* @param fm - checkpoint file manager used to manage streaming query checkpoint
* @param fileToRead - name of file to use to read changelog
* @param compressionCodec - de-compression method using for reading changelog file
* Base class representing a iterator that iterates over a range of changelog files in a state
* store. In each iteration, it will return a tuple of (changeType: [[RecordType]],
* nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]])
*
* @param fm checkpoint file manager used to manage streaming query checkpoint
* @param stateLocation location of the state store
* @param startVersion start version of the changelog file to read
* @param endVersion end version of the changelog file to read
* @param compressionCodec de-compression method using for reading changelog file
*/
abstract class StateStoreChangeDataReader(
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
endVersion: Long,
compressionCodec: CompressionCodec)
compressionCodec: CompressionCodec)
extends NextIterator[(RecordType.Value, UnsafeRow, UnsafeRow, Long)] with Logging {

class ChangeLogFileIterator(
stateLocation: Path,
startVersion: Long,
endVersion: Long) extends Iterator[Path] {
assert(startVersion >= 1)
assert(endVersion >= startVersion)

// assertions
assert(true)
/**
* Iterator that iterates over the changelog files in the state store.
*/
private class ChangeLogFileIterator extends Iterator[Path] {

private var currentVersion = startVersion - 1
private var currentVersion = StateStoreChangeDataReader.this.startVersion - 1

/**
* returns the version of the return of the latest [[next]] function call
*/
/** returns the version of the changelog returned by the latest [[next]] function call */
def getVersion: Long = currentVersion

override def hasNext: Boolean = currentVersion < endVersion
override def hasNext: Boolean = currentVersion < StateStoreChangeDataReader.this.endVersion

override def next(): Path = {
currentVersion += 1
getChangelogPath(stateLocation, currentVersion)
getChangelogPath(currentVersion)
}
}

protected lazy val fileIterator =
new ChangeLogFileIterator(stateLocation, startVersion, endVersion)
private def getChangelogPath(version: Long): Path =
new Path(
StateStoreChangeDataReader.this.stateLocation,
s"$version.${StateStoreChangeDataReader.this.changelogSuffix}")
}

/** file suffix of the changelog files */
protected var changelogSuffix: String
private lazy val fileIterator = new ChangeLogFileIterator
private var changelogReader: StateStoreChangelogReader = null

/**
* Get a changelog reader that has at least one record left to read. If there is no readers left,
* return null.
*/
protected def currentChangelogReader(): StateStoreChangelogReader = {
while (changelogReader == null || !changelogReader.hasNext) {
if (changelogReader != null) {
changelogReader.close()
}
if (!fileIterator.hasNext) {
finished = true
return null
}
// Todo: Does not support StateStoreChangelogReaderV2
changelogReader =
new StateStoreChangelogReaderV1(fm, fileIterator.next(), compressionCodec)
}
changelogReader
}

private def getChangelogPath(stateLocation: Path, version: Long): Path =
new Path(stateLocation, s"$version.$changelogSuffix")

override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long)
/** get the version of the current changelog reader */
protected def currentChangelogVersion: Long = fileIterator.getVersion

def close(): Unit
override def close(): Unit = {
if (changelogReader != null) {
changelogReader.close()
}
}
}

class HDFSBackedStateStoreCDCReader(
Expand All @@ -85,100 +115,60 @@ class HDFSBackedStateStoreCDCReader(
endVersion: Long,
compressionCodec: CompressionCodec,
keySchema: StructType,
valueSchema: StructType
)
valueSchema: StructType)
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec) {
override protected var changelogSuffix: String = "delta"

private var currentChangelogReader: StateStoreChangelogReader = null
override protected var changelogSuffix: String = "delta"

override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
while (currentChangelogReader == null || !currentChangelogReader.hasNext) {
if (currentChangelogReader != null) {
currentChangelogReader.close()
}
if (!fileIterator.hasNext) {
finished = true
return null
}
currentChangelogReader =
new StateStoreChangelogReaderV1(fm, fileIterator.next(), compressionCodec)
val reader = currentChangelogReader()
if (reader == null) {
return null
}

val readResult = currentChangelogReader.next()
val (recordType, keyArray, valueArray, _) = reader.next()
val keyRow = new UnsafeRow(keySchema.fields.length)
keyRow.pointTo(readResult._2, readResult._2.length)
if (readResult._3 == null) {
(readResult._1, keyRow, null, fileIterator.getVersion - 1)
keyRow.pointTo(keyArray, keyArray.length)
if (valueArray == null) {
(recordType, keyRow, null, currentChangelogVersion - 1)
} else {
val valueRow = new UnsafeRow(valueSchema.fields.length)
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is a workaround for the following:
// Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
// `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
valueRow.pointTo(readResult._3, (readResult._3.length / 8) * 8)
(readResult._1, keyRow, valueRow, fileIterator.getVersion - 1)
}
}

override def close(): Unit = {
if (currentChangelogReader != null) {
currentChangelogReader.close()
valueRow.pointTo(valueArray, (valueArray.length / 8) * 8)
(recordType, keyRow, valueRow, currentChangelogVersion - 1)
}
}
}

class RocksDBStateStoreCDCReader(
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap: ConcurrentHashMap[String, (RocksDBKeyStateEncoder, RocksDBValueStateEncoder)]
)
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
ConcurrentHashMap[String, (RocksDBKeyStateEncoder, RocksDBValueStateEncoder)])
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec) {
override protected var changelogSuffix: String = "changelog"

private var currentChangelogReader: StateStoreChangelogReader = null
override protected var changelogSuffix: String = "changelog"

override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
while (currentChangelogReader == null || !currentChangelogReader.hasNext) {
if (currentChangelogReader != null) {
currentChangelogReader.close()
currentChangelogReader = null
}
if (!fileIterator.hasNext) {
finished = true
return null
}
currentChangelogReader =
new StateStoreChangelogReaderV1(fm, fileIterator.next(), compressionCodec)
val reader = currentChangelogReader()
if (reader == null) {
return null
}

val (recordType, keyArray, valueArray, columnFamily) = currentChangelogReader.next()
val (recordType, keyArray, valueArray, columnFamily) = reader.next()
val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) = keyValueEncoderMap.get(columnFamily)
// val keyRow = new UnsafeRow(keySchema.fields.length)
// keyRow.pointTo(readResult._2, readResult._2.length)
val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
if (valueArray == null) {
(recordType, keyRow, null, fileIterator.getVersion - 1)
(recordType, keyRow, null, currentChangelogVersion - 1)
} else {
// val valueRow = new UnsafeRow(valueSchema.fields.length)
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is a workaround for the following:
// Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
// `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
// valueRow.pointTo(readResult._3, (readResult._3.length))
val valueRow = rocksDBValueStateEncoder.decodeValue(valueArray)
(recordType, keyRow, valueRow, fileIterator.getVersion - 1)
}
}

override def close(): Unit = {
if (currentChangelogReader != null) {
currentChangelogReader.close()
(recordType, keyRow, valueRow, currentChangelogVersion - 1)
}
}
}

0 comments on commit 15a8316

Please sign in to comment.