Skip to content

Commit

Permalink
add new test on partition not found error
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 11, 2024
1 parent 292ec5d commit aa337c1
Showing 1 changed file with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,21 +386,17 @@ class HDFSBackedStateDataSourceReadSuite
override protected def newStateStoreProvider(): HDFSBackedStateStoreProvider =
new HDFSBackedStateStoreProvider

test("ERROR: snapshot partition not found") {
testPartitionNotFound()
test("ERROR: snapshot of version not found") {
testSnapshotNotFound()
}

test("provider.getReadStore(startVersion, endVersion)") {
testGetReadStoreWithStart()
testGetReadStoreWithStartVersion()
}

test("option snapshotPartitionId") {
testSnapshotPartitionId()
}

test("snapshotStartBatchId and snapshotPartitionId end to end") {
testSnapshotEndToEnd()
}
}

class RocksDBStateDataSourceReadSuite
Expand Down Expand Up @@ -433,11 +429,11 @@ class RocksDBWithChangelogCheckpointStateDataSourceReaderSuite
new RocksDBStateStoreProvider

test("ERROR: snapshot partition not found") {
testPartitionNotFound()
testSnapshotNotFound()
}

test("provider.getReadStore(startVersion, endVersion)") {
testGetReadStoreWithStart()
testGetReadStoreWithStartVersion()
}

test("option snapshotPartitionId") {
Expand Down Expand Up @@ -952,7 +948,7 @@ abstract class StateDataSourceReadSuite[storeProvider <: StateStoreProvider]
}
}

protected def testPartitionNotFound(): Unit = {
protected def testSnapshotNotFound(): Unit = {
withTempDir(tempDir => {
val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
for (i <- 1 to 4) {
Expand All @@ -969,7 +965,7 @@ abstract class StateDataSourceReadSuite[storeProvider <: StateStoreProvider]
})
}

protected def testGetReadStoreWithStart(): Unit = {
protected def testGetReadStoreWithStartVersion(): Unit = {
withTempDir(tempDir => {
val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
for (i <- 1 to 4) {
Expand Down Expand Up @@ -1007,7 +1003,23 @@ abstract class StateDataSourceReadSuite[storeProvider <: StateStoreProvider]
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)

// should result in only one partition && should not throw error in planning stage
assert(stateDf.rdd.getNumPartitions == 1)

// should throw error when partition id is out of range
val stateDfError = spark.read.format("statestore")
.option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0)
.option(
StateSourceOptions.SNAPSHOT_PARTITION_ID,
spark.sessionState.conf.numShufflePartitions)
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)

val exc = intercept[SparkException] {
stateDfError.show()
}
checkError(exc, "SNAPSHOT_PARTITION_ID_NOT_FOUND", "54054",
Map("snapshotPartitionId" -> spark.sessionState.conf.numShufflePartitions.toString))
})
}
}

0 comments on commit aa337c1

Please sign in to comment.