Skip to content

Commit

Permalink
Refactoring around CheckpointMetadata - Move it from Snapshot to LogS…
Browse files Browse the repository at this point in the history
…egment.

GitOrigin-RevId: 200db588b57e8a5fdbf43a1199533a54fb66b4d0
  • Loading branch information
prakharjain09 authored and vkorukanti committed Apr 20, 2023
1 parent 3bcb8f0 commit 08d6f63
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 98 deletions.
74 changes: 49 additions & 25 deletions core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ case class CheckpointMetaData(
case Some(_) => CheckpointMetaData.Format.WITH_PARTS
case None => CheckpointMetaData.Format.SINGLE
}

/** Whether two [[CheckpointMetaData]] represents the same checkpoint */
def semanticEquals(other: CheckpointMetaData): Boolean = {
CheckpointInstance(this) == CheckpointInstance(other)
}
}

object CheckpointMetaData {
Expand Down Expand Up @@ -248,17 +253,16 @@ object CheckpointMetaData {
s""""$result""""
}

def fromLogSegment(segment: LogSegment): Option[CheckpointMetaData] = {
segment.checkpointVersionOpt.map { version =>
CheckpointMetaData(
version = version,
size = -1L,
parts = numCheckpointParts(segment.checkpoint.head.getPath),
sizeInBytes = Some(segment.checkpoint.map(_.getLen).sum),
numOfAddFiles = None,
checkpointSchema = None
)
}
def fromFiles(files: Seq[FileStatus]): CheckpointMetaData = {
assert(files.nonEmpty, "files should be non empty to construct CheckpointMetaData")
CheckpointMetaData(
version = checkpointVersion(files.head),
size = -1L,
parts = numCheckpointParts(files.head.getPath),
sizeInBytes = Some(files.map(_.getLen).sum),
numOfAddFiles = None,
checkpointSchema = None
)
}
}

Expand All @@ -278,13 +282,15 @@ case class CheckpointInstance(
s" ${CheckpointMetaData.Format.WITH_PARTS.name}")

/**
* Returns a [[CheckpointFileListProvider]] which can tell the files corresponding to this
* Returns a [[CheckpointProvider]] which can tell the files corresponding to this
* checkpoint.
* The `checkpointMetadataHint` might be passed to [[CheckpointProvider]] so that underlying
* [[CheckpointProvider]] provides more precise info.
*/
def getCheckpointFileListProvider(
def getCheckpointProvider(
logPath: Path,
filesForLegacyCheckpointConstruction: Seq[FileStatus],
checkpointMetadataHint: Option[CheckpointMetaData] = None): CheckpointFileListProvider = {
filesForCheckpointConstruction: Seq[FileStatus],
checkpointMetadataHint: Option[CheckpointMetaData] = None): CheckpointProvider = {
format match {
case CheckpointMetaData.Format.WITH_PARTS | CheckpointMetaData.Format.SINGLE =>
val filePaths = if (format == CheckpointMetaData.Format.WITH_PARTS) {
Expand All @@ -293,12 +299,14 @@ case class CheckpointInstance(
Set(checkpointFileSingular(logPath, version))
}
val newCheckpointFileArray =
filesForLegacyCheckpointConstruction.filter(f => filePaths.contains(f.getPath))
filesForCheckpointConstruction.filter(f => filePaths.contains(f.getPath))
assert(newCheckpointFileArray.length == filePaths.size,
"Failed in getting the file information for:\n" +
filePaths.mkString(" -", "\n -", "") + "\namong\n" +
filesForLegacyCheckpointConstruction.map(_.getPath).mkString(" -", "\n -", ""))
PreloadedCheckpointFileProvider(newCheckpointFileArray)
filesForCheckpointConstruction.map(_.getPath).mkString(" -", "\n -", ""))
PreloadedCheckpointProvider(
newCheckpointFileArray,
checkpointMetadataHint.filter(cm => CheckpointInstance(cm) == this))
case CheckpointMetaData.Format.SENTINEL =>
throw DeltaErrors.assertionFailedError(
s"invalid checkpoint format ${CheckpointMetaData.Format.SENTINEL}")
Expand Down Expand Up @@ -484,7 +492,8 @@ trait Checkpoints extends DeltaLogging {
parts = cv.numParts,
sizeInBytes = None,
numOfAddFiles = None,
checkpointSchema = None)
checkpointSchema = None
)
}

/**
Expand Down Expand Up @@ -731,7 +740,8 @@ object Checkpoints extends DeltaLogging {
parts = numPartsOption,
sizeInBytes = Some(checkpointSizeInBytes),
numOfAddFiles = Some(snapshot.numOfFiles),
checkpointSchema = checkpointSchemaToWriteInLastCheckpointFile(spark, schema))
checkpointSchema = checkpointSchemaToWriteInLastCheckpointFile(spark, schema)
)
}

/**
Expand Down Expand Up @@ -859,18 +869,32 @@ object CheckpointV2 {
}

/**
* A trait which provides functionality to retrieve the underlying files for a Checkpoint.
* A trait which provides information about a checkpoint to the Snapshot.
* - files in the underlying checkpoint
* - metadata of the underlying checkpoint
*/
trait CheckpointFileListProvider {
trait CheckpointProvider {
def checkpointFiles: Seq[FileStatus]
def checkpointMetadata: CheckpointMetaData
}

/**
* An implementation of [[CheckpointFileListProvider]] where the information about checkpoint files
* An implementation of [[CheckpointProvider]] where the information about checkpoint files
* (i.e. Seq[FileStatus]) is already known in advance.
*
* @param checkpointFiles - file statuses for the checkpoint
* @param checkpointMetadataOpt - optional checkpoint metadata for the checkpoint.
* If this is passed, the provider will use it instead of deriving the
* [[CheckpointMetaData]] from the file list.
*/
case class PreloadedCheckpointFileProvider(
override val checkpointFiles: Seq[FileStatus]) extends CheckpointFileListProvider {
case class PreloadedCheckpointProvider(
override val checkpointFiles: Seq[FileStatus],
checkpointMetadataOpt: Option[CheckpointMetaData]
) extends CheckpointProvider {

override def checkpointMetadata: CheckpointMetaData = {
checkpointMetadataOpt.getOrElse(CheckpointMetaData.fromFiles(checkpointFiles))
}

require(checkpointFiles.nonEmpty, "There should be atleast 1 checkpoint file")
}
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class Snapshot(
val logSegment: LogSegment,
override val deltaLog: DeltaLog,
val timestamp: Long,
val checksumOpt: Option[VersionChecksum],
checkpointMetadataOpt: Option[CheckpointMetaData] = None)
val checksumOpt: Option[VersionChecksum]
)
extends SnapshotDescriptor
with SnapshotStateManager
with StateCache
Expand Down Expand Up @@ -354,7 +354,8 @@ class Snapshot(
}
}

def getCheckpointMetadataOpt: Option[CheckpointMetaData] = checkpointMetadataOpt
def getCheckpointMetadataOpt: Option[CheckpointMetaData] =
logSegment.checkpointProviderOpt.map(_.checkpointMetadata)

def redactedPath: String =
Utils.redact(spark.sessionState.conf.stringRedactionPattern, path.toUri.toString)
Expand Down
Loading

0 comments on commit 08d6f63

Please sign in to comment.