Skip to content

Commit

Permalink
[HUDI-7034] Fix refresh table/view (apache#10151)
Browse files Browse the repository at this point in the history
* [HUDI-7034] Refresh index fix - remove cached file slices within partitions

---------

Co-authored-by: vmakarevich <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
  • Loading branch information
3 people authored Nov 23, 2023
1 parent b77eff2 commit bcb974b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ private void doRefresh() {

// Reset it to null to trigger re-loading of all partition path
this.cachedAllPartitionPaths = null;
// Reset to force reload file slices inside partitions
this.cachedAllInputFileSlices = new HashMap<>();
if (!shouldListLazily) {
ensurePreloadedPartitions(getAllQueryPartitionPaths());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPU
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
import org.apache.hudi.common.engine.EngineType
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
Expand Down Expand Up @@ -240,6 +240,67 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS
assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions)
}

@ParameterizedTest
@CsvSource(value = Array("lazy,true", "lazy,false",
"eager,true", "eager,false"))
def testIndexRefreshesFileSlices(listingModeOverride: String,
useMetadataTable: Boolean): Unit = {
def getDistinctCommitTimeFromAllFilesInIndex(files: Seq[PartitionDirectory]): Seq[String] = {
files.flatMap(_.files).map(fileStatus => new HoodieBaseFile(fileStatus.getPath.toString)).map(_.getCommitTime).distinct
}

val r = new Random(0xDEED)
// partition column values are [0, 5)
val tuples = for (i <- 1 to 1000) yield (r.nextString(1000), r.nextInt(5), r.nextString(1000))

val writeOpts = commonOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString)
val _spark = spark
import _spark.implicits._
val inputDF = tuples.toDF("_row_key", "partition", "timestamp")
inputDF
.write
.format("hudi")
.options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

val readOpts = queryOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString,
DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key -> listingModeOverride
)

metaClient = HoodieTableMetaClient.reload(metaClient)
val fileIndexFirstWrite = HoodieFileIndex(spark, metaClient, None, readOpts)

val listFilesAfterFirstWrite = fileIndexFirstWrite.listFiles(Nil, Nil)
val distinctListOfCommitTimesAfterFirstWrite = getDistinctCommitTimeFromAllFilesInIndex(listFilesAfterFirstWrite)
val firstWriteCommitTime = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
assertEquals(1, distinctListOfCommitTimesAfterFirstWrite.size, "Should have only one commit")
assertEquals(firstWriteCommitTime, distinctListOfCommitTimesAfterFirstWrite.head, "All files should belong to the first existing commit")

val nextBatch = for (
i <- 0 to 4
) yield(r.nextString(1000), i, r.nextString(1000))

nextBatch.toDF("_row_key", "partition", "timestamp")
.write
.format("hudi")
.options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)

fileIndexFirstWrite.refresh()
val fileSlicesAfterSecondWrite = fileIndexFirstWrite.listFiles(Nil, Nil)
val distinctListOfCommitTimesAfterSecondWrite = getDistinctCommitTimeFromAllFilesInIndex(fileSlicesAfterSecondWrite)
metaClient = HoodieTableMetaClient.reload(metaClient)
val lastCommitTime = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp

assertEquals(1, distinctListOfCommitTimesAfterSecondWrite.size, "All basefiles affected so all have same commit time")
assertEquals(lastCommitTime, distinctListOfCommitTimesAfterSecondWrite.head, "All files should be of second commit after index refresh")
}

@ParameterizedTest
@CsvSource(value = Array("lazy,true,true", "lazy,true,false", "lazy,false,true", "lazy,false,false",
"eager,true,true", "eager,true,false", "eager,false,true", "eager,false,false"))
Expand Down

0 comments on commit bcb974b

Please sign in to comment.