Skip to content

Commit

Permalink
reflect comments from Jungtaek
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 25, 2024
1 parent ef9b095 commit 876256e
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ case class StateSourceOptions(

override def toString: String = {
s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, batchId=$batchId, " +
s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide" +
snapshotStartBatchId.map(", snapshotStartBatchId=" + _).getOrElse("") +
snapshotPartitionId.map(", snapshotPartitionId=" + _).getOrElse("") + ")"
s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " +
s"snapshotStartBatchId=${snapshotStartBatchId.getOrElse("None")}, " +
s"snapshotPartitionId=${snapshotPartitionId.getOrElse("None")})"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class StatePartitionReader(
private lazy val store: ReadStateStore = {
partition.sourceOptions.snapshotStartBatchId match {
case None => provider.getReadStore(partition.sourceOptions.batchId + 1)

case Some(snapshotStartBatchId) => provider.getReadStore(
snapshotStartBatchId + 1,
partition.sourceOptions.batchId + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class StateScan(
case None => partitionNums.map { pn =>
new StateStoreInputPartition(pn, queryId, sourceOptions)
}.toArray

case Some(snapshotPartitionId) =>
if (partitionNums.contains(snapshotPartitionId)) {
Array(new StateStoreInputPartition(snapshotPartitionId, queryId, sourceOptions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ trait StateDataSourceTestBase extends StreamTest with StateStoreMetricsTest {
}
}

protected def getStreamStreamJoinQuery(inputStream: MemoryStream[(Int, Long)]): DataFrame = {
private def getStreamStreamJoinQuery(inputStream: MemoryStream[(Int, Long)]): DataFrame = {
val df = inputStream.toDS()
.select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"))

Expand Down

0 comments on commit 876256e

Please sign in to comment.