Skip to content

Commit

Permalink
DataPathHandler Bug Fix
Browse files Browse the repository at this point in the history
Fixing a bug where external data paths don't accidently trigger the HDFS path when executing local path logic.
  • Loading branch information
blee1234 committed Aug 1, 2022
1 parent 9c1c497 commit 2eb7460
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private[offline] object DataSourceAccessor {
failOnMissingPartition: Boolean,
addTimestampColumn: Boolean,
dataLoaderHandlers: List[DataLoaderHandler]): DataSourceAccessor = {
val pathChecker = PathChecker(ss)
val pathChecker = PathChecker(ss, dataLoaderHandlers)
val fileLoaderFactory = DataLoaderFactory(ss = ss, dataLoaderHandlers = dataLoaderHandlers)
val partitionLimiter = new PartitionLimiter(ss)
val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.linkedin.feathr.offline.source.pathutil

import com.linkedin.feathr.offline.util.{HdfsUtils, LocalFeatureJoinUtils, SourceUtils}
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import org.apache.hadoop.conf.Configuration

/**
* path checker for local test files.
* @param hadoopConf hadoop configuration
*/
private[offline] class LocalPathChecker(hadoopConf: Configuration) extends PathChecker {
private[offline] class LocalPathChecker(hadoopConf: Configuration, dataLoaderHandlers: List[DataLoaderHandler]) extends PathChecker {

private val TEST_AVRO_JSON_FILE = "/data.avro.json"

Expand All @@ -20,13 +21,33 @@ private[offline] class LocalPathChecker(hadoopConf: Configuration) extends PathC
LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, None).isDefined
}

/**
* check whether the input path is an Hdfs path. Neesd to have separate function, as there are class conflicts with Breaks.getClass
* @param path input path.
* @return true if the path is an Hdfs path.
*/
def isHdfs(path: String): Boolean = {
import scala.util.control.Breaks._

var isHdfsFlag: Boolean = true
breakable {
for(dataLoaderHandler <- dataLoaderHandlers) {
if (dataLoaderHandler.validatePath(path)) {
isHdfsFlag = false
break
}
}
}
isHdfsFlag
}

/**
* check whether the input path exists. It will try different formats for local test.
* @param path input path.
* @return true if the path exists.
*/
override def exists(path: String): Boolean = {
if (HdfsUtils.exists(path)) return true
if (isHdfs(path) && HdfsUtils.exists(path)) return true
if (LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, None).isDefined) return true
if (getClass.getClassLoader.getResource(path) != null) return true
if (getClass.getClassLoader.getResource(path + TEST_AVRO_JSON_FILE) != null) return true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.feathr.offline.source.pathutil

import org.apache.spark.sql.SparkSession
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler

/**
* Common path utility functions
Expand All @@ -25,8 +26,8 @@ private[offline] trait PathChecker {
* It will construct a specific path checker according to the spark session.
*/
private[offline] object PathChecker {
def apply(ss : SparkSession): PathChecker = {
if (ss.sparkContext.isLocal) new LocalPathChecker(ss.sparkContext.hadoopConfiguration)
def apply(ss : SparkSession, dataLoaderHandlers: List[DataLoaderHandler]): PathChecker = {
if (ss.sparkContext.isLocal) new LocalPathChecker(ss.sparkContext.hadoopConfiguration, dataLoaderHandlers)
else new HdfsPathChecker()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH

// Only file-based source has real "path", others are just single dataset
val adjustedObsTimeRange = if (factDataSource.location.isFileBasedLocation()) {
val pathChecker = PathChecker(ss)
val pathChecker = PathChecker(ss, dataLoaderHandlers)
val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers)
val pathInfo = pathAnalyzer.analyze(factDataSource.path)
if (pathInfo.dateTimeResolution == DateTimeResolution.DAILY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[offline] object SourceUtils {
sourceFormatType match {
case SourceFormatType.FIXED_PATH => Seq(HdfsUtils.getLatestPath(sourcePath, ss.sparkContext.hadoopConfiguration))
case SourceFormatType.TIME_PATH =>
val pathChecker = PathChecker(ss)
val pathChecker = PathChecker(ss, dataLoaderHandlers)
val pathGenerator = new TimeBasedHdfsPathGenerator(pathChecker)
val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers)
val pathInfo = pathAnalyzer.analyze(sourcePath)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.linkedin.feathr.offline.source.pathutil

import com.linkedin.feathr.offline.TestFeathr
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.mapred.JobConf
import org.mockito.Mockito.when
import org.scalatest.mockito.MockitoSugar
import org.testng.Assert.{assertEquals, assertTrue}
Expand All @@ -14,13 +18,13 @@ class TestPathChecker extends TestFeathr with MockitoSugar {

@Test(description = "test creation of PathChecker")
def testCreateDataSourcePathChecker(): Unit = {
val localPathChecker = PathChecker(ss)
val localPathChecker = PathChecker(ss, List())
assertTrue(localPathChecker.isInstanceOf[LocalPathChecker])
val mockSparkSession = mock[SparkSession]
val mockSparkContext = mock[SparkContext]
when(mockSparkSession.sparkContext).thenReturn(mockSparkContext)
when(mockSparkContext.isLocal).thenReturn(false)
val defaultPathChecker = PathChecker(mockSparkSession)
val defaultPathChecker = PathChecker(mockSparkSession, List())
assertTrue(defaultPathChecker.isInstanceOf[HdfsPathChecker])
}

Expand All @@ -35,10 +39,36 @@ class TestPathChecker extends TestFeathr with MockitoSugar {

@Test(description = "test exists method for LocalPathChecker")
def testLocalPathCheckerExists() : Unit = {
val localPathChecker = new LocalPathChecker(new Configuration())
val localPathChecker = new LocalPathChecker(new Configuration(), List())
assertEquals(localPathChecker.exists("src/test/resources/anchor1-source.csv"), true)
assertEquals(localPathChecker.exists("anchor1-source.csv"), true)
assertEquals(localPathChecker.exists("generation/daily/2019/05/19"), true)
assertEquals(localPathChecker.exists("non-existing_path"), false)
}

@Test(description = "test isHdfs method for LocalPathChecker")
def testLocalPathCheckerIsHdfs() : Unit = {
val mockCreateDataFrame = (path: String, daliParameters: Map[String, String], jobConf: JobConf) => mock[DataFrame]

val mockUnionDataFrame = (
paths: Seq[String],
parameters: Map[String, String],
jobConf: JobConf) => mock[DataFrame]

val mockWriteDataFrame = (
df: DataFrame,
path: String,
parameters: Map[String, String]) => {}

val mockDataLoaderHandler = DataLoaderHandler(
validatePath = (path: String) => path == "xyz://",
createDataFrame = mockCreateDataFrame,
createUnionDataFrame = mockUnionDataFrame,
writeDataFrame = mockWriteDataFrame,
)
val mockDataLoaderHandlers = List(mockDataLoaderHandler)
val localPathChecker = new LocalPathChecker(new Configuration(), mockDataLoaderHandlers)
assertTrue(!localPathChecker.isHdfs("xyz://"))
assertTrue(localPathChecker.isHdfs("file://"))
}
}

0 comments on commit 2eb7460

Please sign in to comment.