Skip to content

Commit

Permalink
rename to startVersion to snapshotVersion to make its function clear
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 27, 2024
1 parent 40b6dc6 commit e15213e
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -892,55 +892,56 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

/**
* Get the state store of endVersion for reading by applying delta files on the snapshot of
* startVersion. If snapshot for startVersion does not exist, an error will be thrown.
* snapshotVersion. If snapshot for snapshotVersion does not exist, an error will be thrown.
*
* @param startVersion checkpoint version of the snapshot to start with
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedStateStore]]
*/
override def replayStateFromSnapshot(startVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(startVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, startVersion)} to " +
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update")
new HDFSBackedStateStore(endVersion, newMap)
}

/**
* Get the state store of endVersion for reading by applying delta files on the snapshot of
* startVersion. If snapshot for startVersion does not exist, an error will be thrown.
* snapshotVersion. If snapshot for snapshotVersion does not exist, an error will be thrown.
*
* @param startVersion checkpoint version of the snapshot to start with
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedReadStateStore]]
*/
override def replayReadStateFromSnapshot(startVersion: Long, endVersion: Long): ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(startVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, startVersion)} to " +
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for readonly")
new HDFSBackedReadStateStore(endVersion, newMap)
}

/**
* Consturct the state at endVersion from snapshot from startVersion.
* Consturct the state at endVersion from snapshot from snapshotVersion.
* Returns a new [[HDFSBackedStateStoreMap]]
* @param startVersion checkpoint version of the snapshot to start with
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
*/
private def replayLoadedMapForStoreFromSnapshot(startVersion: Long, endVersion: Long):
private def replayLoadedMapForStoreFromSnapshot(snapshotVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = synchronized {
try {
if (startVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < startVersion || endVersion < 0) {
if (endVersion < snapshotVersion || endVersion < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}

val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
if (endVersion != 0) {
newMap.putAll(constructMapFromSnapshot(startVersion, endVersion))
newMap.putAll(constructMapFromSnapshot(snapshotVersion, endVersion))
}
newMap
}
Expand All @@ -949,29 +950,29 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}
}

private def constructMapFromSnapshot(startVersion: Long, endVersion: Long):
private def constructMapFromSnapshot(snapshotVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = {
val (result, elapsedMs) = Utils.timeTakenMs {
val startVersionMap = synchronized { Option(loadedMaps.get(startVersion)) } match {
val startVersionMap = synchronized { Option(loadedMaps.get(snapshotVersion)) } match {
case Some(value) => Option(value)
case None => readSnapshotFile(startVersion)
case None => readSnapshotFile(snapshotVersion)
}
if (startVersionMap.isEmpty) {
throw StateStoreErrors.stateStoreSnapshotFileNotFound(
snapshotFile(startVersion).toString, toString())
snapshotFile(snapshotVersion).toString, toString())
}

// Load all the deltas from the version after the start version up to the end version.
val resultMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
resultMap.putAll(startVersionMap.get)
for (deltaVersion <- startVersion + 1 to endVersion) {
for (deltaVersion <- snapshotVersion + 1 to endVersion) {
updateFromDeltaFile(deltaVersion, resultMap)
}

resultMap
}

logDebug(s"Loading state from $startVersion to $endVersion takes $elapsedMs ms.")
logDebug(s"Loading state from $snapshotVersion to $endVersion takes $elapsedMs ms.")

result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,25 +231,25 @@ class RocksDB(
* end version. Note that this will copy all the necessary files from DFS to local disk as needed,
* and possibly restart the native RocksDB instance.
*
* @param startVersion version of the snapshot to start with
* @param snapshotVersion version of the snapshot to start with
* @param endVersion end version
* @return A RocksDB instance loaded with the state endVersion replayed from startVersion.
* @return A RocksDB instance loaded with the state endVersion replayed from snapshotVersion.
* Note that the instance will be read-only since this method is only used in State Data
* Source.
*/
def loadFromSnapshot(startVersion: Long, endVersion: Long): RocksDB = {
assert(startVersion >= 0 && endVersion >= startVersion)
def loadFromSnapshot(snapshotVersion: Long, endVersion: Long): RocksDB = {
assert(snapshotVersion >= 0 && endVersion >= snapshotVersion)
acquire(LoadStore)
recordedMetrics = None
logInfo(
log"Loading ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, startVersion)}")
log"${MDC(LogKeys.VERSION_NUM, snapshotVersion)}")
try {
replayFromCheckpoint(startVersion, endVersion)
replayFromCheckpoint(snapshotVersion, endVersion)

logInfo(
log"Loaded ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, startVersion)}")
log"${MDC(LogKeys.VERSION_NUM, snapshotVersion)}")
} catch {
case t: Throwable =>
loadedVersion = -1 // invalidate loaded data
Expand All @@ -263,17 +263,17 @@ class RocksDB(
* end version.
* If the start version does not exist, it will throw an exception.
*
* @param startVersion start checkpoint version
* @param snapshotVersion start checkpoint version
* @param endVersion end version
*/
private def replayFromCheckpoint(startVersion: Long, endVersion: Long): Any = {
if (loadedVersion != startVersion) {
private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): Any = {
if (loadedVersion != snapshotVersion) {
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(startVersion, workingDir)
loadedVersion = startVersion
val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, workingDir)
loadedVersion = snapshotVersion

// reset last snapshot version
if (lastSnapshotVersion > startVersion) {
if (lastSnapshotVersion > snapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,31 +369,31 @@ private[sql] class RocksDBStateStoreProvider
if (!condition) { throw new IllegalStateException(msg) }
}

override def replayStateFromSnapshot(startVersion: Long, endVersion: Long): StateStore = {
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
try {
if (startVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < startVersion) {
if (endVersion < snapshotVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.loadFromSnapshot(startVersion, endVersion)
rocksDB.loadFromSnapshot(snapshotVersion, endVersion)
new RocksDBStateStore(endVersion)
}
catch {
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}

override def replayReadStateFromSnapshot(startVersion: Long, endVersion: Long): StateStore = {
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
try {
if (startVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < startVersion) {
if (endVersion < snapshotVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.loadFromSnapshot(startVersion, endVersion)
rocksDB.loadFromSnapshot(snapshotVersion, endVersion)
new RocksDBStateStore(endVersion)
}
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,9 @@ trait SupportsFineGrainedReplayFromSnapshot {
* Used by snapshotStartBatchId option when reading state generated by join operation as data
* source.
* Return an instance of [[StateStore]] representing state data of the given version.
* The State Store will be constructed from the snapshot at startVersion, and applying delta files
* up to the endVersion. If there is no snapshot file at startVersion, an exception will be
* thrown.
* The State Store will be constructed from the snapshot at snapshotVersion, and applying delta
* files up to the endVersion. If there is no snapshot file at snapshotVersion, an exception will
* be thrown.
*
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
Expand All @@ -462,8 +462,8 @@ trait SupportsFineGrainedReplayFromSnapshot {
* Used by snapshotStartBatchId option when reading state generated by all stateful operations
* except join as data source.
* Return an instance of [[ReadStateStore]] representing state data of the given version.
* The State Store will be constructed from the snapshot at startVersion, and applying delta files
* up to the endVersion. If there is no snapshot file at startVersion, an exception will be
* The State Store will be constructed from the snapshot at snapshotVersion, and applying delta files
* up to the endVersion. If there is no snapshot file at snapshotVersion, an exception will be
* thrown.
*
* @param snapshotVersion checkpoint version of the snapshot to start with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
testSnapshotNotFound()
}

test("provider.getReadStore(startVersion, endVersion)") {
test("provider.replayReadStoreFromSnapshot(snapshotVersion, endVersion)") {
testGetReadStoreWithStartVersion()
}

Expand Down Expand Up @@ -451,7 +451,7 @@ StateDataSourceReadSuite {
testSnapshotNotFound()
}

test("provider.getReadStore(startVersion, endVersion)") {
test("provider.getReadStore(snapshotVersion, endVersion)") {
testGetReadStoreWithStartVersion()
}

Expand Down

0 comments on commit e15213e

Please sign in to comment.