Skip to content

Commit

Permalink
Fix Checksum.scala store.read callsite to use the new LogStore API
Browse files Browse the repository at this point in the history
Cache the fileStatus of the last-read checksum file in SnapshotManagement.scala. This cache can then be used to potentially invoke the new LogStore read API in Checksum.scala.

Closes #2643

GitOrigin-RevId: e285e87168f4816e729bd10fc7a86a0f3624b2cc
  • Loading branch information
sumeet-db authored and vkorukanti committed Feb 22, 2024
1 parent a842991 commit 210503a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
16 changes: 12 additions & 4 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.FileSizeHistogram
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -117,12 +118,19 @@ trait ReadChecksum extends DeltaLogging { self: DeltaLog =>
val logPath: Path
private[delta] def store: LogStore

private[delta] def readChecksum(version: Long): Option[VersionChecksum] = {
private[delta] def readChecksum(
version: Long,
checksumFileStatusHintOpt: Option[FileStatus] = None): Option[VersionChecksum] = {
recordDeltaOperation(self, "delta.readChecksum") {
val checksumFile = FileNames.checksumFile(logPath, version)

val checksumFilePath = FileNames.checksumFile(logPath, version)
val verifiedChecksumFileStatusOpt =
checksumFileStatusHintOpt.filter(_.getPath == checksumFilePath)
var exception: Option[String] = None
val content = try Some(store.read(checksumFile, newDeltaHadoopConf())) catch {
val content = try Some(
verifiedChecksumFileStatusOpt
.map(store.read(_, newDeltaHadoopConf()))
.getOrElse(store.read(checksumFilePath, newDeltaHadoopConf()))
) catch {
case NonFatal(e) =>
// We expect FileNotFoundException; if it's another kind of exception, we still catch them
// here but we log them in the checksum error event below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ trait SnapshotManagement { self: DeltaLog =>

@volatile private[delta] var asyncUpdateTask: Future[Unit] = _

/**
* Cached fileStatus for the latest CRC file seen in the deltaLog.
*/
@volatile protected var lastSeenChecksumFileStatusOpt: Option[FileStatus] = None
@volatile protected var currentSnapshot: CapturedSnapshot = getSnapshotAtInit

/** Use ReentrantLock to allow us to call `lockInterruptibly` */
Expand Down Expand Up @@ -121,6 +125,7 @@ trait SnapshotManagement { self: DeltaLog =>
*
* @param startVersion the version to start. Inclusive.
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
* @param includeMinorCompactions Whether to include minor compaction files in the result
* @return Some array of files found (possibly empty, if no usable commit files are present), or
* None if the listing returned no files at all.
*/
Expand All @@ -130,14 +135,19 @@ trait SnapshotManagement { self: DeltaLog =>
includeMinorCompactions: Boolean): Option[Array[FileStatus]] =
recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
listFromOrNone(startVersion).map { _
.collect {
.flatMap {
case DeltaFile(f, fileVersion) =>
(f, fileVersion)
Some((f, fileVersion))
case CompactedDeltaFile(f, startVersion, endVersion)
if includeMinorCompactions && versionToLoad.forall(endVersion <= _) =>
(f, startVersion)
Some((f, startVersion))
case CheckpointFile(f, fileVersion) if f.getLen > 0 =>
(f, fileVersion)
Some((f, fileVersion))
case ChecksumFile(f, version) if versionToLoad.forall(version <= _) =>
lastSeenChecksumFileStatusOpt = Some(f)
None
case _ =>
None
}
// take files until the version we want to load
.takeWhile { case (_, fileVersion) => versionToLoad.forall(fileVersion <= _) }
Expand Down Expand Up @@ -500,7 +510,8 @@ trait SnapshotManagement { self: DeltaLog =>
logSegment = segment,
deltaLog = this,
timestamp = segment.lastCommitTimestamp,
checksumOpt = checksumOpt.orElse(readChecksum(segment.version))
checksumOpt = checksumOpt.orElse(
readChecksum(segment.version, lastSeenChecksumFileStatusOpt))
)
}
}
Expand Down

0 comments on commit 210503a

Please sign in to comment.