Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 27, 2024
1 parent b0ed777 commit 429e1ba
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 74 deletions.
238 changes: 167 additions & 71 deletions spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import java.io.FileNotFoundException
import java.util.Objects
import java.util.concurrent.Future
import java.util.concurrent.{CompletableFuture, Future}
import java.util.concurrent.locks.ReentrantLock

import scala.collection.mutable
Expand All @@ -29,7 +29,7 @@ import scala.util.control.NonFatal

import com.databricks.spark.util.TagDefinitions.TAG_ASYNC
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitStore}
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitStore, GetCommitsResponse}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
Expand Down Expand Up @@ -120,84 +120,172 @@ trait SnapshotManagement { self: DeltaLog =>
protected final def listDeltaCompactedDeltaAndCheckpointFiles(
startVersion: Long,
versionToLoad: Option[Long],
includeMinorCompactions: Boolean): Option[Array[FileStatus]] =
includeMinorCompactions: Boolean): Option[Array[FileStatus]] = {
recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
listFromOrNone(startVersion).map { _
.flatMap {
case DeltaFile(f, fileVersion) =>
Some((f, fileVersion))
case CompactedDeltaFile(f, startVersion, endVersion)
if includeMinorCompactions && versionToLoad.forall(endVersion <= _) =>
Some((f, startVersion))
case CheckpointFile(f, fileVersion) if f.getLen > 0 =>
Some((f, fileVersion))
case ChecksumFile(f, version) if versionToLoad.forall(version <= _) =>
lastSeenChecksumFileStatusOpt = Some(f)
None
case _ =>
None
}
// take files up to the version we want to load
.takeWhile { case (_, fileVersion) => versionToLoad.forall(fileVersion <= _) }
.map(_._1).toArray
listFromFileSystemInternal(startVersion, versionToLoad, includeMinorCompactions)
.map(_.map(_._1))
}
}

private def listFromFileSystemInternal(
startVersion: Long,
versionToLoad: Option[Long],
includeMinorCompactions: Boolean): Option[Array[(FileStatus, FileType.Value, Long)]] = {
listFromOrNone(startVersion).map {
_.flatMap {
case DeltaFile(f, fileVersion) =>
Some((f, FileType.DELTA, fileVersion))
case CompactedDeltaFile(f, startVersion, endVersion)
if includeMinorCompactions && versionToLoad.forall(endVersion <= _) =>
Some((f, FileType.COMPACTED_DELTA, startVersion))
case CheckpointFile(f, fileVersion) if f.getLen > 0 =>
Some((f, FileType.CHECKPOINT, fileVersion))
case ChecksumFile(f, version) if versionToLoad.forall(version <= _) =>
lastSeenChecksumFileStatusOpt = Some(f)
None
case _ =>
None
}
// take files up to the version we want to load
.takeWhile { case (_, _, fileVersion) => versionToLoad.forall(fileVersion <= _) }
.toArray
}
}

/**
* This method is designed to efficiently and reliably list delta, compacted delta, and
* checkpoint files associated with a Delta Lake table. It makes parallel calls to both the
* file-system and a commit-store (if available), reconciles the results to account for
* asynchronous backfill operations, and ensures a comprehensive list of file statuses without
* missing any concurrently backfilled files.
*
* @param startVersion the version to start. Inclusive.
* @param commitStoreOpt the optional commit store to use for fetching un-backfilled commits.
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
* @param includeMinorCompactions Whether to include minor compaction files in the result
* @return Some array of files found (possibly empty, if no usable commit files are present), or
* None if the listing returned no files at all.
*/
protected final def listDeltaCompactedDeltaAndCheckpointFilesWithCommitStore(
startVersion: Long,
commitStoreOpt: Option[CommitStore],
versionToLoad: Option[Long],
includeMinorCompactions: Boolean): Option[Array[FileStatus]] = recordDeltaOperation(
self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
// TODO(managed-commits): Make sure all usage of `listDeltaCompactedDeltaAndCheckpointFiles`
// are replaced with this method.
val resultFromCommitStore = recordFrameProfile("DeltaLog", "CommitStore.getCommits") {
commitStoreOpt match {
case Some(cs) => cs.getCommits(logPath, startVersion, endVersion = versionToLoad).commits
case None => Seq.empty
includeMinorCompactions: Boolean): Option[Array[FileStatus]] = {
recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
// TODO(managed-commits): Make sure all usage of `listDeltaCompactedDeltaAndCheckpointFiles`
// are replaced with this method.
val commitStore = commitStoreOpt.getOrElse {
return listFromFileSystemInternal(startVersion, versionToLoad, includeMinorCompactions)
.map(_.map(_._1))
}
}

var maxDeltaVersionSeen = startVersion - 1
val resultTuplesFromFsListingOpt = recordFrameProfile("DeltaLog", "listFromOrNone") {
listFromOrNone(startVersion).map {
_.flatMap {
case DeltaFile(f, fileVersion) =>
// Ideally listFromOrNone should return lexiographically sorted files amd so
// maxDeltaVersionSeen should be equal to fileVersion. But we are being defensive
// here and taking max of all the fileVersions seen.
maxDeltaVersionSeen = math.max(maxDeltaVersionSeen, fileVersion)
Some((f, FileType.DELTA, fileVersion))
case CompactedDeltaFile(f, startVersion, endVersion)
if includeMinorCompactions && versionToLoad.forall(endVersion <= _) =>
Some((f, FileType.COMPACTED_DELTA, startVersion))
case CheckpointFile(f, fileVersion) if f.getLen > 0 =>
Some((f, FileType.CHECKPOINT, fileVersion))
case ChecksumFile(f, version) if versionToLoad.forall(version <= _) =>
lastSeenChecksumFileStatusOpt = Some(f)
None
case _ =>
None
}
// take files up to the version we want to load
.takeWhile { case (_, _, fileVersion) => versionToLoad.forall(fileVersion <= _) }
.toArray
// Submit a potential async call to get commits from commit store if available
val threadPool = SnapshotManagement.commitStoreGetCommitsThreadPool
def getCommitsTask(async: Boolean): GetCommitsResponse = {
recordFrameProfile("DeltaLog", s"CommitStore.getCommits.async=$async") {
commitStore.getCommits(logPath, startVersion, endVersion = versionToLoad)
}
}
val unbackfilledCommitsResponseFuture =
if (threadPool.getActiveCount < threadPool.getMaximumPoolSize) {
threadPool.submit[GetCommitsResponse](spark) { getCommitsTask(async = true) }
} else {
// If the thread pool is full, we should not submit more tasks to it. Instead, we should
// run the task in the current thread.
logInfo("Getting un-backfilled commits from commit store in the same thread for table " +
s"$dataPath")
recordDeltaEvent(
this,
"delta.listDeltaAndCheckpointFiles.synchronousCommitStoreGetCommits")
CompletableFuture.completedFuture(getCommitsTask(async = false))
}

var maxDeltaVersionSeen = startVersion - 1
val initialLogTuplesFromFsListingOpt =
listFromFileSystemInternal(startVersion, versionToLoad, includeMinorCompactions)
// Ideally listFromFileSystemInternal should return lexicographically sorted files and so
// maxDeltaVersionSeen should be equal to the last delta version. But we are being
// defensive here and taking max of all the delta fileVersions seen.
initialLogTuplesFromFsListingOpt.foreach { logTuples =>
logTuples.filter(_._2 == FileType.DELTA).map(_._3).foreach { deltaVersion =>
maxDeltaVersionSeen = Math.max(maxDeltaVersionSeen, deltaVersion)
}
}
val unbackfilledCommitsResponse = try {
unbackfilledCommitsResponseFuture.get()
} catch {
case e: java.util.concurrent.ExecutionException => throw e.getCause
}

def requiresAdditionalListing(): Boolean = {
// A gap in delta versions may occur if some delta files are backfilled "after" the
// file-system listing but before the commit-store listing. To handle this scenario, we
// perform an additional listing from the file system because those missing files would be
// backfilled by now and show up in the file-system.
// Note: We only care about missing delta files with version <= versionToLoad
val areDeltaFilesMissing = unbackfilledCommitsResponse.commits.headOption match {
case Some(commit) =>
// Missing Delta files: [maxDeltaVersionSeen + 1, commit.head.version - 1]
maxDeltaVersionSeen + 1 < commit.version
case None =>
// Missing Delta files: [maxDeltaVersionSeen + 1, latestTableVersion]
// When there are no commits, we should consider the latestTableVersion from the commit
// store to detect if ALL trailing commits were concurrently backfilled.
unbackfilledCommitsResponse.latestTableVersion >= 0 &&
maxDeltaVersionSeen < unbackfilledCommitsResponse.latestTableVersion
}
versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing
}

val additionalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
if (requiresAdditionalListing()) {
recordDeltaEvent(this, "delta.listDeltaAndCheckpointFiles.requiresAdditionalFsListing")
listFromFileSystemInternal(
startVersion = maxDeltaVersionSeen + 1, versionToLoad, includeMinorCompactions)
} else {
None
}
additionalLogTuplesFromFsListingOpt.foreach { logTuples =>
logTuples.filter(_._2 == FileType.DELTA).map(_._3).foreach { deltaVersion =>
maxDeltaVersionSeen = Math.max(maxDeltaVersionSeen, deltaVersion)
}
}
if (requiresAdditionalListing()) {
// We should not have any gaps in File-System versions and Commit-Store versions after the
// additional listing.
val eventData = Map(
"initialLogsFromFsListingOpt" ->
initialLogTuplesFromFsListingOpt.map(_.map(_._1.getPath.toString)),
"additionalLogsFromFsListingOpt" ->
additionalLogTuplesFromFsListingOpt.map(_.map(_._1.getPath.toString)),
"maxDeltaVersionSeen" -> maxDeltaVersionSeen,
"unbackfilledCommits" ->
unbackfilledCommitsResponse.commits.map(commit => commit.fileStatus.getPath.toString),
"latestCommitVersion" -> unbackfilledCommitsResponse.latestTableVersion)
recordDeltaEvent(
deltaLog = this,
opType = "delta.listDeltaAndCheckpointFiles.unexpectedRequiresAdditionalFsListing",
data = eventData)
}

val finalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
(initialLogTuplesFromFsListingOpt, additionalLogTuplesFromFsListingOpt) match {
case (Some(initial), Some(additional)) => Some(initial ++ additional)
case (Some(initial), None) => Some(initial)
case (None, Some(additional)) => Some(additional)
case _ => None
}

val unbackfilledCommitsFiltered = unbackfilledCommitsResponse.commits
.dropWhile(_.version <= maxDeltaVersionSeen)
.takeWhile(commit => versionToLoad.forall(commit.version <= _))
.map(_.fileStatus)

// If result from fs listing is None and result from commit-store is empty, return none.
// This is used by caller to distinguish whether table doesn't exist.
finalLogTuplesFromFsListingOpt.map { logTuplesFromFsListing =>
logTuplesFromFsListing.map(_._1) ++ unbackfilledCommitsFiltered
}
}
val resultFromCommitStoreFiltered = resultFromCommitStore
.dropWhile(_.version <= maxDeltaVersionSeen)
.takeWhile(commit => versionToLoad.forall(commit.version <= _))
.map(_.fileStatus)
.toArray
if (resultTuplesFromFsListingOpt.isEmpty && resultFromCommitStoreFiltered.nonEmpty) {
throw new IllegalStateException("No files found from the file system listing, but " +
s"files found from the commit store. This is unexpected. Commit Files: " +
s"${resultFromCommitStoreFiltered.map(_.getPath).mkString("Array(", ", ", ")")}")
}
// If result from fs listing is None and result from commit-store is empty, return none.
// This is used by caller to distinguish whether table doesn't exist.
resultTuplesFromFsListingOpt.map { resultTuplesFromFsListing =>
resultTuplesFromFsListing.map(_._1) ++ resultFromCommitStoreFiltered
}
}

Expand Down Expand Up @@ -1104,7 +1192,8 @@ trait SnapshotManagement { self: DeltaLog =>
.map(manuallyLoadCheckpoint)
createLogSegment(
versionToLoad = Some(version),
lastCheckpointInfo = lastCheckpointInfoHint
lastCheckpointInfo = lastCheckpointInfoHint,
commitStoreOpt = current.commitStoreOpt
).map { segment =>
createSnapshot(
initSegment = segment,
Expand Down Expand Up @@ -1133,6 +1222,13 @@ object SnapshotManagement {
new DeltaThreadPool(tpe)
}

private lazy val commitStoreGetCommitsThreadPool = {
val numThreads = SparkSession.active.sessionState.conf
.getConf(DeltaSQLConf.DELTA_LIST_FROM_COMMIT_STORE_THREAD_POOL_SIZE)
val tpe = ThreadUtils.newDaemonCachedThreadPool("commit-store-get-commits", numThreads)
new DeltaThreadPool(tpe)
}

/**
* - Verify the versions are contiguous.
* - Verify the versions start with `expectedStartVersion` if it's specified.
Expand All @@ -1155,7 +1251,7 @@ object SnapshotManagement {
s"file version: $v to compute Snapshot")
}
expectedEndVersion.foreach { v =>
require(versions.nonEmpty && versions.last == v, "Did not get the first delta " +
require(versions.nonEmpty && versions.last == v, "Did not get the last delta " +
s"file version: $v to compute Snapshot")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,14 @@ trait DeltaSQLConfBase {
.checkValue(_ > 0, "threadPoolSize must be positive")
.createWithDefault(20)

val DELTA_LIST_FROM_COMMIT_STORE_THREAD_POOL_SIZE =
buildStaticConf("commitStore.getCommits.threadPoolSize")
.internal()
.doc("The size of the thread pool for listing files from the CommitStore.")
.intConf
.checkValue(_ > 0, "threadPoolSize must be positive")
.createWithDefault(5)

val DELTA_ASSUMES_DROP_CONSTRAINT_IF_EXISTS =
buildConf("constraints.assumesDropIfExists.enabled")
.doc("""If true, DROP CONSTRAINT quietly drops nonexistent constraints even without
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import org.apache.spark.util.ThreadUtils.namedThreadFactory

/** A wrapper for [[ThreadPoolExecutor]] whose tasks run with the caller's [[SparkSession]]. */
private[delta] class DeltaThreadPool(tpe: ThreadPoolExecutor) {
def getActiveCount: Int = tpe.getActiveCount
def getMaximumPoolSize: Int = tpe.getMaximumPoolSize

/** Submits a task for execution and returns a [[Future]] representing that task. */
def submit[T](spark: SparkSession)(body: => T): Future[T] = {
tpe.submit { () => spark.withActive(body) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.util.FileNames
import io.delta.tables.{DeltaTable => IODeltaTable}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach

Expand Down Expand Up @@ -248,6 +249,16 @@ trait DeltaTestUtilsBase {
.map(findIfResponsible[E](_))
.collectFirst { case Some(culprit) => culprit }
}

def verifyBackfilled(file: FileStatus): Unit = {
val unbackfilled = file.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString)
assert(!unbackfilled, s"File $file was not backfilled")
}

def verifyUnbackfilled(file: FileStatus): Unit = {
val unbackfilled = file.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString)
assert(unbackfilled, s"File $file was backfilled")
}
}

trait DeltaCheckpointTestUtils
Expand Down
Loading

0 comments on commit 429e1ba

Please sign in to comment.