Skip to content

Commit

Permalink
Adding custom data loader handler support. (#309)
Browse files Browse the repository at this point in the history
* Adding custom data loader handler support.

* Fixing a bug in FeatureGenExperimentComponent

* Removing data path handler support from FeatureGenExperimentComponent

Currently causing some build failures in online e2e tests.

* Adding documentation to handlers.

Adding logs and fixing python bug.

* Removing loadSourceDataFrame data handler support for now to pass online e2e tests.
  • Loading branch information
blee1234 authored Jun 2, 2022
1 parent a5b202a commit 3dc3549
Show file tree
Hide file tree
Showing 51 changed files with 559 additions and 310 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,4 @@ feathr_project/feathr_cli.egg-info/*

#Local Build
null/*
.bsp/sbt.json
2 changes: 1 addition & 1 deletion feathr_project/feathr/feathr_pyspark_driver_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def submit_spark_job(feature_names_funcs):

print("submit_spark_job: Load DataFrame from Scala engine.")

dataframeFromSpark = py4j_feature_job.loadSourceDataframe(job_param_java_array, set(feature_names_funcs.keys()))
dataframeFromSpark = py4j_feature_job.loadSourceDataframe(job_param_java_array, set(feature_names_funcs.keys())) # TODO: Add data handler support here
print("Submit_spark_job: dataframeFromSpark: ")
print(dataframeFromSpark)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.linkedin.feathr.offline.job._
import com.linkedin.feathr.offline.join.DataFrameFeatureJoiner
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlanner}
import com.linkedin.feathr.offline.source.DataSource
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.util.{FeathrUtils, _}
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand All @@ -26,7 +27,7 @@ import scala.util.{Failure, Success}
*
*/
class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: FeatureGroups, logicalPlanner: MultiStageJoinPlanner,
featureGroupsUpdater: FeatureGroupsUpdater) {
featureGroupsUpdater: FeatureGroupsUpdater, dataPathHandlers: List[DataPathHandler]) {
private val log = Logger.getLogger(getClass)

type KeyTagStringTuple = Seq[String]
Expand Down Expand Up @@ -83,14 +84,14 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
val keyTaggedDerivedFeatures = FeatureGenKeyTagAnalyzer.inferKeyTagsForDerivedFeatures(featureGenSpec, featureGroups, keyTaggedAnchoredFeatures)
val keyTaggedRequiredFeatures = keyTaggedAnchoredFeatures ++ keyTaggedDerivedFeatures
if (isStreaming(featureGenSpec)) {
val streamingFeatureGenerator = new StreamingFeatureGenerator()
val streamingFeatureGenerator = new StreamingFeatureGenerator(dataPathHandlers=dataPathHandlers)
streamingFeatureGenerator.generateFeatures(sparkSession, featureGenSpec, featureGroups, keyTaggedRequiredFeatures)
Map()
} else {
// Get logical plan
val logicalPlan = logicalPlanner.getLogicalPlan(featureGroups, keyTaggedRequiredFeatures)
// This pattern is consistent with the join use case which uses DataFrameFeatureJoiner.
val dataFrameFeatureGenerator = new DataFrameFeatureGenerator(logicalPlan)
val dataFrameFeatureGenerator = new DataFrameFeatureGenerator(logicalPlan=logicalPlan,dataPathHandlers=dataPathHandlers)
val featureMap: Map[TaggedFeatureName, (DataFrame, Header)] =
dataFrameFeatureGenerator.generateFeaturesAsDF(sparkSession, featureGenSpec, featureGroups, keyTaggedRequiredFeatures)

Expand Down Expand Up @@ -262,7 +263,7 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups:
s"Please rename feature ${conflictFeatureNames} or rename the same field names in the observation data.")
}

val joiner = new DataFrameFeatureJoiner(logicalPlan)
val joiner = new DataFrameFeatureJoiner(logicalPlan=logicalPlan,dataPathHandlers=dataPathHandlers)
joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, left, rowBloomFilterThreshold)
}

Expand Down Expand Up @@ -335,6 +336,42 @@ object FeathrClient {
private var featureDefPath: List[String] = List()
private var localOverrideDefPath: List[String] = List()
private var featureDefConfs: List[FeathrConfig] = List()
private var dataPathHandlers: List[DataPathHandler] = List()


/**
* Add a list of data path handlers to the builder. Used to handle accessing and loading paths caught by user's udf, validatePath
*
* @param dataPathHandlers custom data path handlers
* @return FeathrClient.Builder
*/
def addDataPathHandlers(dataPathHandlers: List[DataPathHandler]): Builder = {
this.dataPathHandlers = dataPathHandlers ++ this.dataPathHandlers
this
}

/**
* Add a data path handler to the builder. Used to handle accessing and loading paths caught by user's udf, validatePath
*
* @param dataPathHandler custom data path handler
* @return FeathrClient.Builder
*/
def addDataPathHandler(dataPathHandler: DataPathHandler): Builder = {
this.dataPathHandlers = dataPathHandler :: this.dataPathHandlers
this
}

/**
* Same as {@code addDataPathHandler(DataPathHandler)} but the input dataPathHandlers is optional and when it is missing,
* this method performs an no-op.
*
* @param dataPathHandler custom data path handler
* @return FeathrClient.Builder
*/
def addDataPathHandler(dataPathHandler: Option[DataPathHandler]): Builder = {
if (dataPathHandler.isDefined) addDataPathHandler(dataPathHandler.get) else this
}


/**
* Add a feature definition config string to the builder.
Expand Down Expand Up @@ -492,7 +529,7 @@ object FeathrClient {
featureDefConfigs = featureDefConfigs ++ featureDefConfs

val featureGroups = FeatureGroupsGenerator(featureDefConfigs, Some(localDefConfigs)).getFeatureGroups()
val feathrClient = new FeathrClient(sparkSession, featureGroups, MultiStageJoinPlanner(), FeatureGroupsUpdater())
val feathrClient = new FeathrClient(sparkSession, featureGroups, MultiStageJoinPlanner(), FeatureGroupsUpdater(), dataPathHandlers)

feathrClient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package com.linkedin.feathr.offline.config.location

import com.linkedin.feathr.offline.generation.SparkIOUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hadoop.mapred.JobConf

case class PathList(paths: List[String]) extends InputLocation {
override def getPath: String = paths.mkString(";")

override def getPathList: List[String] = paths

override def loadDf(ss: SparkSession, dataIOParameters: Map[String, String] = Map()): DataFrame = {
SparkIOUtils.createUnionDataFrame(getPathList, dataIOParameters)
SparkIOUtils.createUnionDataFrame(getPathList, dataIOParameters, new JobConf(), List()) //TODO: Add handler support here. Currently there are deserilization issues with adding handlers to factory builder.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package com.linkedin.feathr.offline.config.location

import com.fasterxml.jackson.module.caseclass.annotation.CaseClassDeserialize
import com.linkedin.feathr.offline.generation.SparkIOUtils
import com.linkedin.feathr.offline.source.dataloader.BatchDataLoader
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.codehaus.jackson.annotate.JsonProperty

@CaseClassDeserialize()
case class SimplePath(@JsonProperty("path") path: String) extends InputLocation {
override def loadDf(ss: SparkSession, dataIOParameters: Map[String, String] = Map()): DataFrame = {
SparkIOUtils.createUnionDataFrame(getPathList, dataIOParameters)
SparkIOUtils.createUnionDataFrame(getPathList, dataIOParameters, new JobConf(), List()) // The simple path is not responsible for handling custom data loaders.
}

override def getPath: String = path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.linkedin.feathr.offline.derived.strategies.{DerivationStrategies, Row
import com.linkedin.feathr.offline.join.algorithms.{SequentialJoinConditionBuilder, SparkJoinWithJoinCondition}
import com.linkedin.feathr.offline.logical.FeatureGroups
import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.sparkcommon.FeatureDerivationFunctionSpark
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down Expand Up @@ -100,11 +101,14 @@ private[offline] object DerivedFeatureEvaluator {

def apply(derivationStrategies: DerivationStrategies): DerivedFeatureEvaluator = new DerivedFeatureEvaluator(derivationStrategies)

def apply(ss: SparkSession, featureGroups: FeatureGroups): DerivedFeatureEvaluator = {
def apply(ss: SparkSession,
featureGroups: FeatureGroups,
dataPathHandlers: List[DataPathHandler]): DerivedFeatureEvaluator = {
val defaultStrategies = strategies.DerivationStrategies(
new SparkUdfDerivation(),
new RowBasedDerivation(featureGroups.allTypeConfigs),
new SequentialJoinAsDerivation(ss, featureGroups, SparkJoinWithJoinCondition(SequentialJoinConditionBuilder)))
new SequentialJoinAsDerivation(ss, featureGroups, SparkJoinWithJoinCondition(SequentialJoinConditionBuilder), dataPathHandlers)
)
new DerivedFeatureEvaluator(defaultStrategies)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.linkedin.feathr.offline.job.FeatureTransformation._
import com.linkedin.feathr.offline.job.{AnchorFeatureGroups, FeatureTransformation, KeyedTransformedResult}
import com.linkedin.feathr.offline.join.algorithms.{JoinType, SeqJoinExplodedJoinKeyColumnAppender, SparkJoinWithJoinCondition}
import com.linkedin.feathr.offline.logical.FeatureGroups
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults
import com.linkedin.feathr.offline.transformation.{AnchorToDataSourceMapper, MvelDefinition}
import com.linkedin.feathr.offline.util.{CoercionUtilsScala, DataFrameSplitterMerger, FeaturizedDatasetUtils, FeathrUtils}
Expand All @@ -29,7 +30,10 @@ import scala.collection.mutable
/**
* This class executes Sequential Join as a derivation on base and expansion features.
*/
private[offline] class SequentialJoinAsDerivation(ss: SparkSession, featureGroups: FeatureGroups, joiner: SparkJoinWithJoinCondition)
private[offline] class SequentialJoinAsDerivation(ss: SparkSession,
featureGroups: FeatureGroups,
joiner: SparkJoinWithJoinCondition,
dataPathHandlers: List[DataPathHandler])
extends SequentialJoinDerivationStrategy
with Serializable {
@transient private val log = Logger.getLogger(getClass)
Expand Down Expand Up @@ -589,7 +593,7 @@ private[offline] class SequentialJoinAsDerivation(ss: SparkSession, featureGroup
seqJoinproducedFeatureName: String): (DataFrame, Seq[String]) = {
val expansionFeatureKeys = (derivedFeature.derivation.asInstanceOf[SeqJoinDerivationFunction].right.key)
val expansionAnchor = allAnchoredFeatures(expansionFeatureName)
val expandFeatureInfo = getAnchorFeatureDF(allAnchoredFeatures, expansionFeatureName, new AnchorToDataSourceMapper())
val expandFeatureInfo = getAnchorFeatureDF(allAnchoredFeatures, expansionFeatureName, new AnchorToDataSourceMapper(dataPathHandlers))
val transformedFeatureDF = expandFeatureInfo.transformedResult.df
val expansionAnchorKeyColumnNames = expandFeatureInfo.joinKey
if (expansionFeatureKeys.size != expansionAnchorKeyColumnNames.size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import com.linkedin.feathr.offline.derived.{DerivedFeature, DerivedFeatureEvalua
import com.linkedin.feathr.offline.evaluator.DerivedFeatureGenStage
import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation}
import com.linkedin.feathr.offline.logical.{FeatureGroups, MultiStageJoinPlan}
import com.linkedin.feathr.offline.source.accessor.DataPathHandler
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.transformation.AnchorToDataSourceMapper
import com.linkedin.feathr.offline.util.{AnchorUtils, FeathrUtils}
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand All @@ -18,9 +20,9 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
* Feature generator that is responsible for generating anchored and derived features.
* @param logicalPlan logical plan for feature generation job.
*/
private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan) extends Serializable {
private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan, dataPathHandlers: List[DataPathHandler]) extends Serializable {
@transient val incrementalAggSnapshotLoader = IncrementalAggSnapshotLoader
@transient val anchorToDataFrameMapper = new AnchorToDataSourceMapper()
@transient val anchorToDataFrameMapper = new AnchorToDataSourceMapper(dataPathHandlers)
@transient val featureGenFeatureGrouper = FeatureGenFeatureGrouper()
@transient val featureGenDefaultsSubstituter = FeatureGenDefaultsSubstituter()
@transient val postGenPruner = PostGenPruner()
Expand Down Expand Up @@ -48,7 +50,8 @@ private[offline] class DataFrameFeatureGenerator(logicalPlan: MultiStageJoinPlan
val allRequiredFeatures = logicalPlan.requiredNonWindowAggFeatures ++ logicalPlan.requiredWindowAggFeatures

// 2. Get AnchorDFMap for Anchored features.
val incrementalAggContext = incrementalAggSnapshotLoader.load(featureGenSpec)
val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler)
val incrementalAggContext = incrementalAggSnapshotLoader.load(featureGenSpec=featureGenSpec, dataLoaderHandlers=dataLoaderHandlers)
val allRequiredFeatureAnchorWithSourceAndTime = allRequiredFeatures
.map(_.getFeatureName)
.filter(featureGroups.allAnchoredFeatures.contains)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.databricks.spark.avro.SchemaConverters
import com.linkedin.feathr.common.configObj.generation.OutputProcessorConfig
import com.linkedin.feathr.common.{Header, RichConfig, TaggedFeatureName}
import com.linkedin.feathr.offline.util.{FeatureGenConstants, HdfsUtils}
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand All @@ -20,6 +21,7 @@ private[offline] object FeatureDataHDFSProcessUtils {
* @param skipWrite skip the write to HDFS, only convert the dataframe and return
* @param endTimeOpt optional string of end time, in yyyy/MM/dd format
* @param timestampOpt optional string of auto-generated timestamp
* @param dataLoaderHandlers additional data loader handlers that contain hooks for dataframe creation and manipulation
*/
def processFeatureDataHDFS(
ss: SparkSession,
Expand All @@ -28,7 +30,8 @@ private[offline] object FeatureDataHDFSProcessUtils {
config: OutputProcessorConfig,
skipWrite: Boolean = false,
endTimeOpt: Option[String],
timestampOpt: Option[String]): (DataFrame, Header) = {
timestampOpt: Option[String],
dataLoaderHandlers: List[DataLoaderHandler]): (DataFrame, Header) = {
// since these features are in same dataframe, they must share same key tag size
assert(groupedFeatureToDF.map(_._1.getKeyTag.size()).toSeq.distinct.size == 1)
// the input should have been grouped, so that there's only one dataframe in the input map
Expand All @@ -40,7 +43,7 @@ private[offline] object FeatureDataHDFSProcessUtils {
if (skipWrite) {
(df, header)
} else {
RawDataWriterUtils.writeFdsDataToDisk(ss, featureHeaderMap, parentPath, outputParts, endTimeOpt, saveSchemaMeta, df, header)
RawDataWriterUtils.writeFdsDataToDisk(ss, featureHeaderMap, parentPath, outputParts, endTimeOpt, saveSchemaMeta, df, header, dataLoaderHandlers)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.linkedin.feathr.offline.FeatureName
import com.linkedin.feathr.offline.config.location.SimplePath
import com.linkedin.feathr.offline.job.FeatureGenSpec
import com.linkedin.feathr.offline.source.dataloader.BatchDataLoader
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.util.IncrementalAggUtils
import com.linkedin.feathr.offline.util.datetime.OfflineDateTimeUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -43,8 +44,10 @@ private[offline] trait IncrementalAggSnapshotLoader {
* @param featureGenSpec Feature Generation spec.
* @return Incremental aggregation context.
*/
def load(featureGenSpec: FeatureGenSpec): IncrementalAggContext = {
load(featureGenSpec, FileSystem.get(new Configuration()))
def load(featureGenSpec: FeatureGenSpec, dataLoaderHandlers: List[DataLoaderHandler]): IncrementalAggContext = {
load(featureGenSpec=featureGenSpec,
fs=FileSystem.get(new Configuration()),
dataLoaderHandlers=dataLoaderHandlers)
}

/**
Expand All @@ -55,12 +58,12 @@ private[offline] trait IncrementalAggSnapshotLoader {
* @param fs Filesystem in which to look for the previously aggregated results.
* @return Incremental aggregation context.
*/
private[generation] def load(featureGenSpec: FeatureGenSpec, fs: FileSystem): IncrementalAggContext
private[generation] def load(featureGenSpec: FeatureGenSpec, fs: FileSystem, dataLoaderHandlers: List[DataLoaderHandler]): IncrementalAggContext
}

private[offline] object IncrementalAggSnapshotLoader extends IncrementalAggSnapshotLoader {
private val logger = Logger.getLogger(getClass)
private[generation] override def load(featureGenSpec: FeatureGenSpec, fs: FileSystem): IncrementalAggContext = {
private[generation] override def load(featureGenSpec: FeatureGenSpec, fs: FileSystem, dataLoaderHandlers: List[DataLoaderHandler]): IncrementalAggContext = {
val isIncrementalAggEnabled = featureGenSpec.isEnableIncrementalAgg()
if (!isIncrementalAggEnabled) {
IncrementalAggContext(isIncrementalAggEnabled, None, Map.empty[FeatureName, DataFrame], Map.empty[FeatureName, String])
Expand All @@ -84,7 +87,10 @@ private[offline] object IncrementalAggSnapshotLoader extends IncrementalAggSnaps
val endDate = OfflineDateTimeUtils.createTimeFromString(featureGenSpec.endTimeStr, featureGenSpec.endTimeFormat).toLocalDateTime
val directory = IncrementalAggUtils.getLatestAggSnapshotDFPath(preAggRootDir, endDate).get
val spark = SparkSession.builder().getOrCreate()
val preAggSnapshot = new BatchDataLoader(spark, SimplePath(directory)).loadDataFrame()
val preAggSnapshot = new BatchDataLoader(ss=spark,
location=SimplePath(directory),
dataLoaderHandlers=dataLoaderHandlers
).loadDataFrame()
val features = params.getStringList(FeatureGenerationPathName.FEATURES).asScala
// user may have added new features in this run
val oldFeatures = features.filter(preAggSnapshot.columns.contains)
Expand Down
Loading

0 comments on commit 3dc3549

Please sign in to comment.