Skip to content

Commit

Permalink
create new error for SupportsFineGrainedReplayFromSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 26, 2024
1 parent 97ee3ef commit 23639f4
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
7 changes: 7 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3773,6 +3773,13 @@
],
"sqlState" : "42802"
},
"STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY" : {
"message" : [
"The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.SupportsFineGrainedReplayFromSnapshot.",
"Therefore, it does not support option snapshotStartBatchId in state data source."
],
"sqlState" : "42K06"
},
"STATE_STORE_UNSUPPORTED_OPERATION" : {
"message" : [
"<operationType> operation not supported with <entity>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.v2.state

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
Expand Down Expand Up @@ -97,9 +98,16 @@ class StatePartitionReader(
case None => provider.getReadStore(partition.sourceOptions.batchId + 1)

case Some(snapshotStartBatchId) =>
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot].replayReadStateFromSnapshot(
snapshotStartBatchId + 1,
partition.sourceOptions.batchId + 1)
if (!provider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) {
throw new SparkException(
errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY",
messageParameters = Map("inputClass" -> provider.getClass.toString),
cause = null)
}
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
.replayReadStateFromSnapshot(
snapshotStartBatchId + 1,
partition.sourceOptions.batchId + 1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.annotation.tailrec

import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkException
import org.apache.spark.TaskContext
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{END_INDEX, START_INDEX, STATE_STORE_ID}
Expand Down Expand Up @@ -493,6 +494,12 @@ class SymmetricHashJoinStateManager(
useColumnFamilies = false, storeConf, hadoopConf,
useMultipleValuesPerKey = false)
if (snapshotStartVersion.isDefined) {
if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) {
throw new SparkException(
errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY",
messageParameters = Map("inputClass" -> stateStoreProvider.getClass.toString),
cause = null)
}
stateStoreProvider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
.replayStateFromSnapshot(snapshotStartVersion.get, stateInfo.get.storeVersion)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,8 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
}

val exc = intercept[SparkException] {
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot].replayReadStateFromSnapshot(1, 2)
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
.replayReadStateFromSnapshot(1, 2)
}
checkError(exc, "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED")
})
Expand All @@ -1001,7 +1002,9 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
provider.doMaintenance()
}

val result = provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot].replayReadStateFromSnapshot(2, 3)
val result =
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
.replayReadStateFromSnapshot(2, 3)

assert(get(result, "a", 1).get == 1)
assert(get(result, "a", 2).get == 2)
Expand Down

0 comments on commit 23639f4

Please sign in to comment.