Skip to content

Commit

Permalink
Reflect more comments from Anish
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 22, 2024
1 parent 2eb6646 commit be30817
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@
"Error reading snapshot file <fileToRead> of <clazz>: key size cannot be <keySize>."
]
},
"CANNOT_READ_SNAPSHOT_FILE_NOT_EXISTS" : {
"CANNOT_READ_MISSING_SNAPSHOT_FILE" : {
"message" : [
"Error reading snapshot file <fileToRead> of <clazz>: <fileToRead> does not exist."
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

/**
* Get the state store of endVersion for reading by applying delta files on the snapshot of
* startVersion. If startVersion does not exist, an error will be thrown.
* startVersion. If snapshot for startVersion does not exist, an error will be thrown.
*
* @param startVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
Expand All @@ -287,7 +287,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

/**
* Get the state store of endVersion for reading by applying delta files on the snapshot of
* startVersion. If startVersion does not exist, an error will be thrown.
* startVersion. If snapshot for startVersion does not exist, an error will be thrown.
*
* @param startVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
Expand Down Expand Up @@ -317,6 +317,12 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}
}

/**
* Consturct the state at endVersion from snapshot from startVersion.
* Returns a new [[HDFSBackedStateStoreMap]]
* @param startVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
*/
private def getLoadedMapForStore(startVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = synchronized {
try {
Expand Down Expand Up @@ -598,9 +604,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}

private def loadMap(startVersion: Long, endVersion: Long): HDFSBackedStateStoreMap = {

val (result, elapsedMs) = Utils.timeTakenMs {
val startVersionMap = Option(loadedMaps.get(startVersion)) match {
val startVersionMap = synchronized { Option(loadedMaps.get(startVersion)) } match {
case Some(value) =>
loadedMapCacheHitCount.increment()
Option(value)
Expand Down Expand Up @@ -773,8 +778,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}

/**
* try to read the snapshot file of the given version.
* If the snapshot file is not available, return [[None]].
* Try to read the snapshot file. If the snapshot file is not available, return [[None]].
*
* @param version the version of the snapshot file
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ class RocksDB(
}

/**
* Load from the start checkpoint version and apply all the changelog records to reach the
* end version. Note that this will copy all the necessary file from DFS to local disk as needed,
* Load from the start snapshot version and apply all the changelog records to reach the
* end version. Note that this will copy all the necessary files from DFS to local disk as needed,
* and possibly restart the native RocksDB instance.
*
* @param startVersion
* @param endVersion
* @param readOnly
* @return
* @param startVersion version of the snapshot to start with
* @param endVersion end version
* @param readOnly whether the RocksDB instance is read-only
* @return A RocksDB instance loaded with the state endVersion replayed from startVersion
*/
def load(startVersion: Long, endVersion: Long, readOnly: Boolean): RocksDB = {
assert(startVersion >= 0 && endVersion >= startVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ object StateStoreErrors {
}

def stateStoreSnapshotFileNotFound(fileToRead: String, clazz: String):
StateStoreSnapshotFileNotFound = {
StateStoreSnapshotFileNotFound = {
new StateStoreSnapshotFileNotFound(fileToRead, clazz)
}

def stateStoreSnapshotPartitionNotFound(
snapshotPartitionId: Long, operatorId: Int, checkpointLocation: String):
StateStoreSnapshotPartitionNotFound = {
snapshotPartitionId: Long, operatorId: Int, checkpointLocation: String):
StateStoreSnapshotPartitionNotFound = {
new StateStoreSnapshotPartitionNotFound(snapshotPartitionId, operatorId, checkpointLocation)
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ class StateStoreValueSchemaNotCompatible(

class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String)
extends SparkUnsupportedOperationException(
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_NOT_EXISTS",
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead,
"clazz" -> clazz))
Expand Down

0 comments on commit be30817

Please sign in to comment.