Skip to content

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 29, 2024
1 parent f37dc86 commit 7d693d2
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class DeltaHistoryManager(
}

/**
* Get the commit information of the Delta table from commit `[start, end)`. If `end` is `None`,
* Get the commit information of the Delta table from commit `[start, end]`. If `end` is `None`,
* we return all commits from start to now.
*/
def getHistory(
Expand All @@ -77,7 +77,7 @@ class DeltaHistoryManager(
import org.apache.spark.sql.delta.implicits._
val conf = getSerializableHadoopConf
val logPath = deltaLog.logPath.toString
val snapshot = endOpt.map(end => deltaLog.getSnapshotAt(end - 1)).getOrElse(deltaLog.update())
val snapshot = endOpt.map(end => deltaLog.getSnapshotAt(end)).getOrElse(deltaLog.update())
val commitFileProvider = DeltaCommitFileProvider(snapshot)
// We assume that commits are contiguous, therefore we try to load all of them in order
val info = spark.range(start, snapshot.version + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,25 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests
}
}

test("getHistory returns the correct set of commits") {
val tblName = "delta_table"
withTable(tblName) {
val start = 1540415658000L
generateCommits(tblName, start, start + 20.minutes, start + 40.minutes, start + 60.minutes)
val deltaLog = DeltaLog.forTable(spark, getTableLocation(tblName))
val history_02 = deltaLog.history.getHistory(start = 0, endOpt = Some(2))
assert(history_02.size == 3)
assert(history_02.map(_.getVersion) == Seq(2, 1, 0))

val history_13 = deltaLog.history.getHistory(start = 1, endOpt = Some(1))
assert(history_13.size == 1)
assert(history_13.map(_.getVersion) == Seq(1))

val history_2 = deltaLog.history.getHistory(start = 2, endOpt = None)
assert(history_2.size == 2)
assert(history_2.map(_.getVersion) == Seq(3, 2))
}
}
}

/** Uses V2 resolution code paths */
Expand Down

0 comments on commit 7d693d2

Please sign in to comment.