Skip to content

Commit

Permalink
allow rocksdb to reconstruct state from a specific checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 7, 2024
1 parent 2475173 commit 07267b5
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -576,29 +576,6 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
private def loadMap(startVersion: Long, endVersion: Long): HDFSBackedStateStoreMap = {

val (result, elapsedMs) = Utils.timeTakenMs {
// val snapshotCurrentVersionMap = readSnapshotFile(version)
// if (snapshotCurrentVersionMap.isDefined) {
// synchronized { putStateIntoStateCacheMap(version, snapshotCurrentVersionMap.get) }
// return snapshotCurrentVersionMap.get
// }

// // Find the most recent map before this version that we can.
// // [SPARK-22305] This must be done iteratively to avoid stack overflow.
// var lastAvailableVersion = version
// var lastAvailableMap: Option[HDFSBackedStateStoreMap] = None
// while (lastAvailableMap.isEmpty) {
// lastAvailableVersion -= 1

// if (lastAvailableVersion <= 0) {
// // Use an empty map for versions 0 or less.
// lastAvailableMap = Some(HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey))
// } else {
// lastAvailableMap =
// synchronized { Option(loadedMaps.get(lastAvailableVersion)) }
// .orElse(readSnapshotFile(lastAvailableVersion))
// }
// }

val startVersionMap =
synchronized { Option(loadedMaps.get(startVersion)) }
.orElse{
Expand All @@ -623,7 +600,6 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
resultMap
}

// todo
logDebug(s"Loading state from $startVersion to $endVersion takes $elapsedMs ms.")

result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,38 +180,10 @@ class RocksDB(
logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}")
try {
if (loadedVersion != version) {
closeDB()
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion

// reset last snapshot version
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
if (loadedVersion != version) replayChangelog(version)
// After changelog replay the numKeysOnWritingVersion will be updated to
// the correct number of keys in the loaded version.
numKeysOnLoadedVersion = numKeysOnWritingVersion
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
}
if (conf.resetStatsOnLoad) {
nativeStats.reset
loadFromCheckpoint(latestSnapshotVersion, version)
}

logInfo(log"Loaded ${MDC(LogKeys.VERSION_NUM, version)}")
} catch {
case t: Throwable =>
Expand All @@ -226,6 +198,86 @@ class RocksDB(
this
}

/**
* Load from the start checkpoint version and apply all the changelog records to reach the
* end version. Note that this will copy all the necessary file from DFS to local disk as needed,
* and possibly restart the native RocksDB instance.
*
* @param startVersion
* @param endVersion
* @param readOnly
* @return
*/
def load(startVersion: Long, endVersion: Long, readOnly: Boolean): RocksDB = {
assert(startVersion >= 0 && endVersion >= startVersion)
acquire(LoadStore)
recordedMetrics = None
logInfo(
log"Loading ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, startVersion)}")
try {
loadFromCheckpoint(startVersion, endVersion)

logInfo(
log"Loaded ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, startVersion)}")
} catch {
case t: Throwable =>
loadedVersion = -1 // invalidate loaded data
throw t
}
if (enableChangelogCheckpointing && !readOnly) {
// Make sure we don't leak resource.
changelogWriter.foreach(_.abort())
changelogWriter = Some(fileManager.getChangeLogWriter(endVersion + 1, useColumnFamilies))
}
this
}

/**
* Load from the start checkpoint version and apply all the changelog records to reach the
* end version.
* If the start version does not exist, it will throw an exception.
*
* @param startVersion start checkpoint version
* @param endVersion end version
*/
def loadFromCheckpoint(startVersion: Long, endVersion: Long): Any = {
if (loadedVersion != startVersion) {
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(startVersion, workingDir)
loadedVersion = startVersion

// reset last snapshot version
if (lastSnapshotVersion > startVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
}
if (loadedVersion != endVersion) replayChangelog(endVersion)
// After changelog replay the numKeysOnWritingVersion will be updated to
// the correct number of keys in the loaded version.
numKeysOnLoadedVersion = numKeysOnWritingVersion
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics

if (conf.resetStatsOnLoad) {
nativeStats.reset
}
}

/**
* Replay change log from the loaded version to the target version.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,22 @@ private[sql] class RocksDBStateStoreProvider
}
}

override def getReadStore(startVersion: Long, endVersion: Long): StateStore = {
try {
if (startVersion < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
}
if (endVersion < startVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.load(startVersion, endVersion, true)
new RocksDBStateStore(endVersion)
}
catch {
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}

override def doMaintenance(): Unit = {
try {
rocksDB.doMaintenance()
Expand Down
14 changes: 10 additions & 4 deletions state-store-content-check.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from pyspark.sql.functions import window, col

spark = SparkSession.builder\
.config("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")\
.config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", True)\
.config("spark.sql.shuffle.partitions", 4).getOrCreate()


# aggregate operator
q1 = spark.readStream.format("rate").option("rowsPerSecond", 3).load().withWatermark("timestamp", "50 seconds")\
q1 = spark.readStream.format("rate").option("rowsPerSecond", 3).load().withWatermark("timestamp", "30 seconds")\
.groupBy(window("timestamp", "10 seconds")).count().select("window.start", "window.end", "count")\
.writeStream.format("memory").queryName("window").option("checkpointLocation", "/tmp/state/window").trigger(processingTime="50 seconds").start()
.writeStream.format("memory").queryName("window").option("checkpointLocation", "/tmp/state/window").trigger(processingTime="30 seconds").start()

# join operator
sdf1 = spark.readStream.format("rate").option("rowsPerSecond", 100).load().withWatermark("timestamp", "50 seconds")
Expand All @@ -24,9 +30,9 @@


state1_1 = spark.read.format("statestore")\
.option("snapshotStartBatchId", 11)\
.option("snapshotStartBatchId", 1)\
.option("snapshotPartitionId", 1)\
.load("/Users/yuchen.liu/Desktop/spark/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.0-state-source").show()
.load("/tmp/state/window").show()


state1_2 = spark.read.format("statestore").option("batchId", 53).load("/tmp/state/window").show()
Expand Down

0 comments on commit 07267b5

Please sign in to comment.