Skip to content

Commit

Permalink
address comments from Anish & Wei
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 13, 2024
1 parent 4825215 commit 20e1b9c
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@
},
"SNAPSHOT_PARTITION_ID_NOT_FOUND" : {
"message" : [
"Partition id <snapshotPartitionId> not found for given state source."
"Partition id <snapshotPartitionId> not found for state of operator <operatorId> at <checkpointLocation>."
]
},
"UNCATEGORIZED" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2159,18 +2159,6 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
cause = null)
}

def failedToReadSnapshotFileNotExistsError(
fileToRead: Path,
clazz: String,
f: Throwable): Throwable = {
new SparkException(
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_NOT_EXISTS",
messageParameters = Map(
"fileToRead" -> fileToRead.toString(),
"clazz" -> clazz),
cause = f)
}

def failedToReadSnapshotFileValueSizeError(
fileToRead: Path,
clazz: String,
Expand Down Expand Up @@ -2198,13 +2186,6 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
cause = f)
}

def snapshotPartitionNotFoundError(snapshotPartitionId : Long): Throwable = {
new SparkException(
errorClass = "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND",
messageParameters = Map("snapshotPartitionId" -> snapshotPartitionId.toString()),
cause = null)
}

def cannotPurgeAsBreakInternalStateError(): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(errorClass = "_LEGACY_ERROR_TEMP_2260")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import org.apache.hadoop.fs.{Path, PathFilter}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan, ScanBuilder}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide}
import org.apache.spark.sql.execution.streaming.state.StateStoreConf
import org.apache.spark.sql.execution.streaming.state.{StateStoreConf, StateStoreSnapshotPartitionNotFound}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -90,7 +89,9 @@ class StateScan(
if (partitionNums.contains(snapshotPartitionId)) {
Array(new StateStoreInputPartition(snapshotPartitionId, queryId, sourceOptions))
} else {
throw QueryExecutionErrors.snapshotPartitionNotFoundError(snapshotPartitionId)
throw new StateStoreSnapshotPartitionNotFound(
snapshotPartitionId, sourceOptions.operatorId,
sourceOptions.stateCheckpointLocation.toString)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
*
* @param startVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedStateStore]]
*/
override def getStore(startVersion: Long, endVersion: Long): StateStore = {
val newMap = getLoadedMapForStore(startVersion, endVersion)
Expand All @@ -290,6 +291,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
*
* @param startVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedReadStateStore]]
*/
override def getReadStore(startVersion: Long, endVersion: Long): ReadStateStore = {
val newMap = getLoadedMapForStore(startVersion, endVersion)
Expand Down Expand Up @@ -598,17 +600,20 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
private def loadMap(startVersion: Long, endVersion: Long): HDFSBackedStateStoreMap = {

val (result, elapsedMs) = Utils.timeTakenMs {
val startVersionMap =
synchronized { Option(loadedMaps.get(startVersion)) }
.orElse{
logWarning(
log"The state for version ${MDC(LogKeys.FILE_VERSION, startVersion)} doesn't " +
log"exist in loadedMaps. Reading snapshot file and delta files if needed..." +
log"Note that this is normal for the first batch of starting query.")
readSnapshotFile(startVersion)}
val startVersionMap = Option(loadedMaps.get(startVersion)) match {
case Some(value) =>
loadedMapCacheHitCount.increment()
Option(value)
case None =>
logWarning(
log"The state for version ${MDC(LogKeys.FILE_VERSION, startVersion)} doesn't " +
log"exist in loadedMaps. Reading snapshot file and delta files if needed..." +
log"Note that this is normal for the first batch of starting query.")
loadedMapCacheMissCount.increment()
readSnapshotFile(startVersion)
}
if (startVersionMap.isEmpty) {
throw QueryExecutionErrors.failedToReadSnapshotFileNotExistsError(
snapshotFile(startVersion), toString(), null)
throw new StateStoreSnapshotFileNotFound(snapshotFile(startVersion).toString, toString())
}
synchronized { putStateIntoStateCacheMap(startVersion, startVersionMap.get) }

Expand Down Expand Up @@ -768,7 +773,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

/**
* try to read the snapshot file of the given version.
* If the snapshot file is not available, return None.
* If the snapshot file is not available, return [[None]].
*
* @param version the version of the snapshot file
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,18 @@ class StateStoreValueSchemaNotCompatible(
"storedValueSchema" -> storedValueSchema,
"newValueSchema" -> newValueSchema))

class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String)
extends SparkUnsupportedOperationException(
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_NOT_EXISTS",
messageParameters = Map(
"fileToRead" -> fileToRead,
"clazz" -> clazz))

class StateStoreSnapshotPartitionNotFound(
snapshotPartitionId: Long, operatorId: Int, checkpointLocation: String)
extends SparkUnsupportedOperationException(
errorClass = "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND",
messageParameters = Map(
"snapshotPartitionId" -> snapshotPartitionId.toString,
"operatorId" -> operatorId.toString,
"checkpointLocation" -> checkpointLocation))
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
classOf[HDFSBackedStateStoreProvider].getName)
newStateStoreProvider().getClass.getName)
// make sure we have a snapshot for every two delta files
// HDFS maintenance task will not count the latest delta file, which has the same version
// as the snapshot version
Expand All @@ -408,7 +408,7 @@ class RocksDBStateDataSourceReadSuite extends StateDataSourceReadSuite {
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
classOf[RocksDBStateStoreProvider].getName)
newStateStoreProvider().getClass.getName)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
"false")
}
Expand Down Expand Up @@ -1007,11 +1007,10 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)

val exc = intercept[SparkException] {
val exc = intercept[StateStoreSnapshotPartitionNotFound] {
stateDfError.show()
}
checkError(exc, "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND", "58030",
Map("snapshotPartitionId" -> spark.sessionState.conf.numShufflePartitions.toString))
assert(exc.getErrorClass === "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND")
})
}
}

0 comments on commit 20e1b9c

Please sign in to comment.