Skip to content

Commit

Permalink
move StateStoreChangeDataReader to other files and delete it
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jul 1, 2024
1 parent 104ba9c commit 6d6d511
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -986,3 +986,39 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
keySchema, valueSchema)
}
}

/** [[StateStoreChangeDataReader]] implementation for [[HDFSBackedStateStoreProvider]] */
class HDFSBackedStateStoreChangeDataReader(
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
endVersion: Long,
compressionCodec: CompressionCodec,
keySchema: StructType,
valueSchema: StructType)
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec) {

override protected var changelogSuffix: String = "delta"

override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
val reader = currentChangelogReader()
if (reader == null) {
return null
}
val (recordType, keyArray, valueArray, _) = reader.next()
val keyRow = new UnsafeRow(keySchema.fields.length)
keyRow.pointTo(keyArray, keyArray.length)
if (valueArray == null) {
(recordType, keyRow, null, currentChangelogVersion - 1)
} else {
val valueRow = new UnsafeRow(valueSchema.fields.length)
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is a workaround for the following:
// Prior to Spark 2.3 mistakenly append 4 bytes to the value row in
// `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data
valueRow.pointTo(valueArray, (valueArray.length / 8) * 8)
(recordType, keyRow, valueRow, currentChangelogVersion - 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark.sql.execution.streaming.state

import java.io._
import java.util.concurrent.ConcurrentHashMap

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.{Logging, MDC}
Expand Down Expand Up @@ -494,3 +496,34 @@ object RocksDBStateStoreProvider {
CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE, CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES,
CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES)
}

/** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */
class RocksDBStateStoreChangeDataReader(
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
ConcurrentHashMap[String, (RocksDBKeyStateEncoder, RocksDBValueStateEncoder)])
extends StateStoreChangeDataReader(
fm, stateLocation, startVersion, endVersion, compressionCodec) {

override protected var changelogSuffix: String = "changelog"

override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
val reader = currentChangelogReader()
if (reader == null) {
return null
}
val (recordType, keyArray, valueArray, columnFamily) = reader.next()
val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) = keyValueEncoderMap.get(columnFamily)
val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
if (valueArray == null) {
(recordType, keyRow, null, currentChangelogVersion - 1)
} else {
val valueRow = rocksDBValueStateEncoder.decodeValue(valueArray)
(recordType, keyRow, valueRow, currentChangelogVersion - 1)
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.{FSError, Path}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
Expand Down Expand Up @@ -399,3 +400,83 @@ class StateStoreChangelogReaderV2(
}
}
}

/**
* Base class representing a iterator that iterates over a range of changelog files in a state
* store. In each iteration, it will return a tuple of (changeType: [[RecordType]],
* nested key: [[UnsafeRow]], nested value: [[UnsafeRow]], batchId: [[Long]])
*
* @param fm checkpoint file manager used to manage streaming query checkpoint
* @param stateLocation location of the state store
* @param startVersion start version of the changelog file to read
* @param endVersion end version of the changelog file to read
* @param compressionCodec de-compression method using for reading changelog file
*/
abstract class StateStoreChangeDataReader(
fm: CheckpointFileManager,
stateLocation: Path,
startVersion: Long,
endVersion: Long,
compressionCodec: CompressionCodec)
extends NextIterator[(RecordType.Value, UnsafeRow, UnsafeRow, Long)] with Logging {

assert(startVersion >= 1)
assert(endVersion >= startVersion)

/**
* Iterator that iterates over the changelog files in the state store.
*/
private class ChangeLogFileIterator extends Iterator[Path] {

private var currentVersion = StateStoreChangeDataReader.this.startVersion - 1

/** returns the version of the changelog returned by the latest [[next]] function call */
def getVersion: Long = currentVersion

override def hasNext: Boolean = currentVersion < StateStoreChangeDataReader.this.endVersion

override def next(): Path = {
currentVersion += 1
getChangelogPath(currentVersion)
}

private def getChangelogPath(version: Long): Path =
new Path(
StateStoreChangeDataReader.this.stateLocation,
s"$version.${StateStoreChangeDataReader.this.changelogSuffix}")
}

/** file format of the changelog files */
protected var changelogSuffix: String
private lazy val fileIterator = new ChangeLogFileIterator
private var changelogReader: StateStoreChangelogReader = null

/**
* Get a changelog reader that has at least one record left to read. If there is no readers left,
* return null.
*/
protected def currentChangelogReader(): StateStoreChangelogReader = {
while (changelogReader == null || !changelogReader.hasNext) {
if (changelogReader != null) {
changelogReader.close()
}
if (!fileIterator.hasNext) {
finished = true
return null
}
// Todo: Does not support StateStoreChangelogReaderV2
changelogReader =
new StateStoreChangelogReaderV1(fm, fileIterator.next(), compressionCodec)
}
changelogReader
}

/** get the version of the current changelog reader */
protected def currentChangelogVersion: Long = fileIterator.getVersion

override def close(): Unit = {
if (changelogReader != null) {
changelogReader.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

class HDFSBackedStateDataSourceReadChangeDataSuite extends StateDataSourceChangeDataReadSuite {
class HDFSBackedStateDataSourceChangeDataReaderSuite extends StateDataSourceChangeDataReaderSuite {
override protected def newStateStoreProvider(): HDFSBackedStateStoreProvider =
new HDFSBackedStateStoreProvider
}

class RocksDBWithChangelogCheckpointStateDataSourceChangeDataReaderSuite extends
StateDataSourceChangeDataReadSuite {
StateDataSourceChangeDataReaderSuite {
override protected def newStateStoreProvider(): RocksDBStateStoreProvider =
new RocksDBStateStoreProvider

Expand All @@ -44,7 +44,8 @@ StateDataSourceChangeDataReadSuite {
}
}

abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBase with Assertions {
abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestBase
with Assertions {

import testImplicits._
import StateStoreTestsHelper._
Expand Down

0 comments on commit 6d6d511

Please sign in to comment.