Skip to content

Commit

Permalink
reflect more comments from Jungtaek
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 27, 2024
1 parent 42d952f commit 6f1425d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " +
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update")
new HDFSBackedStateStore(endVersion, newMap)
Expand All @@ -917,9 +918,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " +
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for readonly")
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for read-only")
new HDFSBackedReadStateStore(endVersion, newMap)
}

Expand All @@ -935,14 +937,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < snapshotVersion || endVersion < 0) {
if (endVersion < snapshotVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}

val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
if (endVersion != 0) {
newMap.putAll(constructMapFromSnapshot(snapshotVersion, endVersion))
}
newMap.putAll(constructMapFromSnapshot(snapshotVersion, endVersion))

newMap
}
catch {
Expand Down Expand Up @@ -972,7 +973,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
resultMap
}

logDebug(s"Loading state from $snapshotVersion to $endVersion takes $elapsedMs ms.")
logDebug(s"Loading snapshot at version $snapshotVersion and apply delta files to version " +
s"$endVersion takes $elapsedMs ms.")

result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,14 @@ class RocksDB(
acquire(LoadStore)
recordedMetrics = None
logInfo(
log"Loading ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, snapshotVersion)}")
log"Loading snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " +
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.")
try {
replayFromCheckpoint(snapshotVersion, endVersion)

logInfo(
log"Loaded ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, snapshotVersion)}")
log"Loaded snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " +
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.")
} catch {
case t: Throwable =>
loadedVersion = -1 // invalidate loaded data
Expand All @@ -267,29 +267,27 @@ class RocksDB(
* @param endVersion end version
*/
private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): Any = {
if (loadedVersion != snapshotVersion) {
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, workingDir)
loadedVersion = snapshotVersion

// reset last snapshot version
if (lastSnapshotVersion > snapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, workingDir)
loadedVersion = snapshotVersion

// reset last snapshot version
if (lastSnapshotVersion > snapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
if (loadedVersion != endVersion) replayChangelog(endVersion)
// After changelog replay the numKeysOnWritingVersion will be updated to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.util.Utils

private[sql] class RocksDBStateStoreProvider
extends StateStoreProvider with Logging with Closeable
with SupportsFineGrainedReplay {
with SupportsFineGrainedReplay {
import RocksDBStateStoreProvider._

class RocksDBStateStore(lastVersion: Long) extends StateStore {
Expand Down Expand Up @@ -384,22 +384,6 @@ private[sql] class RocksDBStateStoreProvider
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}

override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
try {
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < snapshotVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.loadFromSnapshot(snapshotVersion, endVersion)
new RocksDBStateStore(endVersion)
}
catch {
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
}

object RocksDBStateStoreProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
}

protected def testSnapshotNotFound(): Unit = {
withTempDir(tempDir => {
withTempDir { tempDir =>
val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
for (i <- 1 to 4) {
val store = provider.getStore(i - 1)
Expand All @@ -989,11 +989,11 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
.replayReadStateFromSnapshot(1, 2)
}
checkError(exc, "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED")
})
}
}

protected def testGetReadStoreWithStartVersion(): Unit = {
withTempDir(tempDir => {
withTempDir { tempDir =>
val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
for (i <- 1 to 4) {
val store = provider.getStore(i - 1)
Expand All @@ -1012,11 +1012,11 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
assert(get(result, "a", 4).isEmpty)

provider.close()
})
}
}

protected def testSnapshotPartitionId(): Unit = {
withTempDir(tempDir => {
withTempDir { tempDir =>
val inputData = MemoryStream[Int]
val df = inputData.toDF().limit(10)

Expand All @@ -1039,16 +1039,15 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
val stateDfError = spark.read.format("statestore")
.option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0)
.option(
StateSourceOptions.SNAPSHOT_PARTITION_ID,
spark.sessionState.conf.numShufflePartitions)
StateSourceOptions.SNAPSHOT_PARTITION_ID, 1)
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)

val exc = intercept[StateStoreSnapshotPartitionNotFound] {
stateDfError.show()
}
assert(exc.getErrorClass === "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND")
})
}
}

// Todo: Should also test against state generated by 3.5
Expand Down

0 comments on commit 6f1425d

Please sign in to comment.