Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Jan 24, 2025
1 parent ba8246e commit 4c6a681
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
25 changes: 21 additions & 4 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ trait RecordChecksum extends DeltaLogging {
* @param deltaLog The DeltaLog
* @param versionToCompute The version for which we want to compute the checksum
* @param actions The actions corresponding to the version `versionToCompute`
* @param metadata The metadata corresponding to the version `versionToCompute`
* @param protocol The protocol corresponding to the version `versionToCompute`
* @param metadataOpt The metadata corresponding to the version `versionToCompute` (if known)
* @param protocolOpt The protocol corresponding to the version `versionToCompute` (if known)
* @param operationName The operation name corresponding to the version `versionToCompute`
* @param txnIdOpt The transaction identifier for the version `versionToCompute`
* @param previousVersionState Contains either the versionChecksum corresponding to
Expand All @@ -156,8 +156,8 @@ trait RecordChecksum extends DeltaLogging {
deltaLog: DeltaLog,
versionToCompute: Long,
actions: Seq[Action],
metadata: Metadata,
protocol: Protocol,
metadataOpt: Option[Metadata],
protocolOpt: Option[Protocol],
operationName: String,
txnIdOpt: Option[String],
previousVersionState: Either[Snapshot, VersionChecksum],
Expand Down Expand Up @@ -215,6 +215,23 @@ trait RecordChecksum extends DeltaLogging {
RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName)
val ignoreRemoveFilesInOperation =
RecordChecksum.operationNamesWhereRemoveFilesIgnoredForIncrementalCrc.contains(operationName)
// Retrieve protocol/metadata in order of precedence:
// 1. Use provided protocol/metadata if available
// 2. Look for a protocol/metadata action in the incremental set of actions to be applied
// 3. Use protocol/metadata from previous version's checksum
// 4. Return PROTOCOL_MISSING/METADATA_MISSING error if all attempts fail
val protocol = protocolOpt
.orElse(actions.collectFirst { case p: Protocol => p })
.orElse(Option(oldVersionChecksum.protocol))
.getOrElse {
return Left("PROTOCOL_MISSING")
}
val metadata = metadataOpt
.orElse(actions.collectFirst { case m: Metadata => m })
.orElse(Option(oldVersionChecksum.metadata))
.getOrElse {
return Left("METADATA_MISSING")
}
val persistentDVsOnTableReadable =
DeletionVectorUtils.deletionVectorsReadable(protocol, metadata)
val persistentDVsOnTableWritable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2554,8 +2554,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
deltaLog,
attemptVersion,
actions = currentTransactionInfo.finalActionsToCommit,
metadata = currentTransactionInfo.metadata,
protocol = currentTransactionInfo.protocol,
metadataOpt = Some(currentTransactionInfo.metadata),
protocolOpt = Some(currentTransactionInfo.protocol),
operationName = currentTransactionInfo.op.name,
txnIdOpt = Some(currentTransactionInfo.txnId),
previousVersionState = scala.Left(snapshot),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class Snapshot(
* checksum file. If the checksum file is not present or if the protocol or metadata is missing
* this will return None.
*/
protected def getProtocolMetadataAndIctFromCrc():
protected def getProtocolMetadataAndIctFromCrc(checksumOpt: Option[VersionChecksum]):
Option[Array[ReconstructedProtocolMetadataAndICT]] = {
if (!spark.sessionState.conf.getConf(
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) {
Expand Down Expand Up @@ -431,7 +431,7 @@ class Snapshot(
Array[ReconstructedProtocolMetadataAndICT] = {
import implicits._

getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc =>
getProtocolMetadataAndIctFromCrc(checksumOpt).foreach { protocolMetadataAndIctFromCrc =>
return protocolMetadataAndIctFromCrc
}

Expand Down

0 comments on commit 4c6a681

Please sign in to comment.