Skip to content

Commit

Permalink
address comments from Anish
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 29, 2024
1 parent d140708 commit 337785d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -891,15 +891,15 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}

/**
* Get the state store of endVersion for reading by applying delta files on the snapshot of
* snapshotVersion. If snapshot for snapshotVersion does not exist, an error will be thrown.
* Get the state store of endVersion by applying delta files on the snapshot of snapshotVersion.
* If snapshot for snapshotVersion does not exist, an error will be thrown.
*
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedStateStore]]
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
Expand All @@ -917,7 +917,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
*/
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
Expand All @@ -926,12 +926,12 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}

/**
* Consturct the state at endVersion from snapshot from snapshotVersion.
* Construct the state map at endVersion from snapshot of version snapshotVersion.
* Returns a new [[HDFSBackedStateStoreMap]]
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
*/
private def replayLoadedMapForStoreFromSnapshot(snapshotVersion: Long, endVersion: Long):
private def replayLoadedMapFromSnapshot(snapshotVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = synchronized {
try {
if (snapshotVersion < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ private[sql] class RocksDBStateStoreProvider
if (!condition) { throw new IllegalStateException(msg) }
}

/**
* Get the state store of endVersion by applying delta files on the snapshot of snapshotVersion.
* If snapshot for snapshotVersion does not exist, an error will be thrown.
*
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[StateStore]]
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
try {
if (snapshotVersion < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ object StateStoreProvider {
* snapshotStartBatchId option in state data source.
*/
trait SupportsFineGrainedReplay {

/**
* Return an instance of [[StateStore]] representing state data of the given version.
* The State Store will be constructed from the snapshot at snapshotVersion, and applying delta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,15 @@ class StateStoreValueSchemaNotCompatible(
"newValueSchema" -> newValueSchema))

class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String)
extends SparkUnsupportedOperationException(
extends SparkRuntimeException(
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead,
"clazz" -> clazz))

class StateStoreSnapshotPartitionNotFound(
snapshotPartitionId: Long, operatorId: Int, checkpointLocation: String)
extends SparkUnsupportedOperationException(
extends SparkRuntimeException(
errorClass = "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND",
messageParameters = Map(
"snapshotPartitionId" -> snapshotPartitionId.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,27 +1065,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass

protected def testSnapshotOnLimitState(providerName: String): Unit = {
/** The golden files are generated by:
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().limit(10)
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (7, 7L), (8, 8L), (9, 9L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (10, 10L), (11, 11L), (12, 12L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().limit(10)
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (7, 7L), (8, 8L), (9, 9L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (10, 10L), (11, 11L), (12, 12L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/"
Expand All @@ -1096,27 +1096,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass

protected def testSnapshotOnAggregateState(providerName: String): Unit = {
/** The golden files are generated by:
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().groupBy("_1").count()
testStream(query, OutputMode.Update)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().groupBy("_1").count()
testStream(query, OutputMode.Update)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/dedup/"
Expand All @@ -1127,27 +1127,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass

protected def testSnapshotOnDeduplicateState(providerName: String): Unit = {
/** The golden files are generated by:
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().dropDuplicates("_1")
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().dropDuplicates("_1")
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/dedup/"
Expand All @@ -1158,25 +1158,25 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass

protected def testSnapshotOnJoinState(providerName: String, stateVersion: Int): Unit = {
/** The golden files are generated by:
withSQLConf({
SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> stateVersion.toString
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = getStreamStreamJoinQuery(inputData)
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
withSQLConf({
SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> stateVersion.toString
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = getStreamStreamJoinQuery(inputData)
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/join$stateVersion/"
Expand Down

0 comments on commit 337785d

Please sign in to comment.