Skip to content

Commit

Permalink
add input error tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jul 2, 2024
1 parent 7354408 commit 9cf8b25
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,26 @@ object StateSourceOptions extends DataSourceOptions {
if (changeStartBatchId.isEmpty) {
throw StateDataSourceErrors.requiredOptionUnspecified(CHANGE_START_BATCH_ID)
}
changeEndBatchId = Option(
changeEndBatchId.getOrElse(getLastCommittedBatch(sparkSession, resolvedCpLocation)))
changeEndBatchId = Option(changeEndBatchId.getOrElse(batchId))

// changeStartBatchId and changeEndBatchId must all be defined at this point
if (changeStartBatchId.get < 0) {
throw StateDataSourceErrors.invalidOptionValueIsNegative(CHANGE_START_BATCH_ID)
}
if (changeEndBatchId.get < changeStartBatchId.get) {
throw StateDataSourceErrors.invalidOptionValue(CHANGE_END_BATCH_ID,
s"$CHANGE_END_BATCH_ID cannot be smaller than $CHANGE_START_BATCH_ID. " +
s"Please check the input to $CHANGE_END_BATCH_ID, or if you are using its default " +
s"value, make sure that $CHANGE_START_BATCH_ID is less than ${changeEndBatchId.get}.")
}
} else {
if (changeStartBatchId.isDefined) {
throw
StateDataSourceErrors.conflictOptions(Seq(READ_CHANGE_FEED, CHANGE_START_BATCH_ID))
throw StateDataSourceErrors.invalidOptionValue(CHANGE_START_BATCH_ID,
s"Only specify this option when $READ_CHANGE_FEED is set to true.")
}
if (changeEndBatchId.isDefined) {
throw StateDataSourceErrors.conflictOptions(Seq(READ_CHANGE_FEED, CHANGE_END_BATCH_ID))
throw StateDataSourceErrors.invalidOptionValue(CHANGE_END_BATCH_ID,
s"Only specify this option when $READ_CHANGE_FEED is set to true.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,62 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB
provider
}

test("ERROR: specify changeStartBatchId in normal mode") {
withTempDir { tempDir =>
val exc = intercept[StateDataSourceInvalidOptionValue] {
spark.read.format("statestore")
.option(StateSourceOptions.BATCH_ID, 0)
.option(StateSourceOptions.CHANGE_START_BATCH_ID, 0)
.option(StateSourceOptions.CHANGE_END_BATCH_ID, 2)
.load(tempDir.getAbsolutePath)
}
assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE")
}
}

test("ERROR: changeStartBatchId is set to negative") {
withTempDir { tempDir =>
val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
spark.read.format("statestore")
.option(StateSourceOptions.BATCH_ID, 0)
.option(StateSourceOptions.READ_CHANGE_FEED, value = true)
.option(StateSourceOptions.CHANGE_START_BATCH_ID, -1)
.option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
.load(tempDir.getAbsolutePath)
}
assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE")
}
}

test("ERROR: changeEndBatchId is set to less than changeStartBatchId") {
withTempDir { tempDir =>
val exc = intercept[StateDataSourceInvalidOptionValue] {
spark.read.format("statestore")
.option(StateSourceOptions.BATCH_ID, 0)
.option(StateSourceOptions.READ_CHANGE_FEED, value = true)
.option(StateSourceOptions.CHANGE_START_BATCH_ID, 1)
.option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
.load(tempDir.getAbsolutePath)
}
assert(exc.getErrorClass === "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE")
}
}

test("ERROR: joinSide option is used together with readChangeFeed") {
withTempDir { tempDir =>
val exc = intercept[StateDataSourceConflictOptions] {
spark.read.format("statestore")
.option(StateSourceOptions.BATCH_ID, 0)
.option(StateSourceOptions.READ_CHANGE_FEED, value = true)
.option(StateSourceOptions.JOIN_SIDE, "left")
.option(StateSourceOptions.CHANGE_START_BATCH_ID, 0)
.option(StateSourceOptions.CHANGE_END_BATCH_ID, 0)
.load(tempDir.getAbsolutePath)
}
assert(exc.getErrorClass === "STDS_CONFLICT_OPTIONS")
}
}

test("getChangeDataReader of state store provider") {
def withNewStateStore(provider: StateStoreProvider, version: Int)(f: StateStore => Unit):
Unit = {
Expand Down

0 comments on commit 9cf8b25

Please sign in to comment.