Skip to content

Commit

Permalink
Merge branch 'skipSnapshotAtBatch' into state-cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 21, 2024
2 parents 2184396 + 2eb6646 commit bd87055
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,21 @@ class StateTable(
}

override def name(): String = {
val desc = s"StateTable " +
var desc = s"StateTable " +
s"[stateCkptLocation=${sourceOptions.stateCheckpointLocation}]" +
s"[batchId=${sourceOptions.batchId}][operatorId=${sourceOptions.operatorId}]" +
s"[storeName=${sourceOptions.storeName}]"

if (sourceOptions.joinSide != JoinSideValues.none) {
desc + s"[joinSide=${sourceOptions.joinSide}]"
} else {
desc
desc += s"[joinSide=${sourceOptions.joinSide}]"
}
if (sourceOptions.snapshotStartBatchId.isDefined) {
desc += s"[snapshotStartBatchId=${sourceOptions.snapshotStartBatchId}]"
}
if (sourceOptions.snapshotPartitionId.isDefined) {
desc += s"[snapshotPartitionId=${sourceOptions.snapshotPartitionId}]"
}
desc
}

override def capabilities(): util.Set[TableCapability] = CAPABILITY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
case None =>
logWarning(
log"The state for version ${MDC(LogKeys.FILE_VERSION, startVersion)} doesn't " +
log"exist in loadedMaps. Reading snapshot file and delta files if needed..." +
log"Note that this is normal for the first batch of starting query.")
log"exist in loadedMaps. Reading snapshot file and delta files if needed..." +
log"Note that this is normal for the first batch of starting query.")
loadedMapCacheMissCount.increment()
readSnapshotFile(startVersion)
}
Expand Down

0 comments on commit bd87055

Please sign in to comment.