From 6f82b6f789b1b77abcaa50e09b870f9710a4d896 Mon Sep 17 00:00:00 2001 From: Jinghui Mo Date: Wed, 28 Sep 2022 16:59:15 -0400 Subject: [PATCH 1/4] 1. In SparkRowExtractor.scala, add new extractor method which can be extended to do batch preprocess source dataframe into RDD[IndexRecord]. 2. In FeatureTransformation.scala, add logic to extract features from RDD[IndexedRecord]. 3. Improve some error messages. --- .../feathr/common/AnchorExtractor.scala | 8 +- .../feathr/common/SparkRowExtractor.scala | 23 +- .../SimpleConfigurableAnchorExtractor.scala | 7 +- .../keyExtractor/MVELSourceKeyExtractor.scala | 7 +- .../offline/config/FeathrConfigLoader.scala | 2 +- .../StreamingFeatureGenerator.scala | 4 +- .../offline/job/FeatureTransformation.scala | 251 +++++++++++++++++- .../DataFrameBasedRowEvaluator.scala | 27 +- .../DefaultValueSubstituter.scala | 2 +- .../transformation/FDSConversionUtils.scala | 2 +- .../util/FeatureValueTypeValidator.scala | 24 +- .../offline/util/FeaturizedDatasetUtils.scala | 2 +- .../util/TestFeatureValueTypeValidator.scala | 4 +- 13 files changed, 308 insertions(+), 55 deletions(-) diff --git a/src/main/scala/com/linkedin/feathr/common/AnchorExtractor.scala b/src/main/scala/com/linkedin/feathr/common/AnchorExtractor.scala index 2e38e4d04..185c9d2d6 100644 --- a/src/main/scala/com/linkedin/feathr/common/AnchorExtractor.scala +++ b/src/main/scala/com/linkedin/feathr/common/AnchorExtractor.scala @@ -1,7 +1,5 @@ package com.linkedin.feathr.common -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema - /** * Provides feature values based on some "raw" data element * @@ -39,12 +37,14 @@ trait AnchorExtractor[T] extends AnchorExtractorBase[T] with SparkRowExtractor { * @param datum input row * @return list of feature keys */ - def getKeyFromRow(datum: GenericRowWithSchema): Seq[String] = getKey(datum.asInstanceOf[T]) + def getKeyFromRow(datum: Any): Seq[String] = getKey(datum.asInstanceOf[T]) /** * Get the feature value from the row * @param datum input row * @return A map of feature name to feature value */ - def getFeaturesFromRow(datum: GenericRowWithSchema): Map[String, FeatureValue] = getFeatures(datum.asInstanceOf[T]) + def getFeaturesFromRow(datum: Any): Map[String, FeatureValue] = getFeatures(datum.asInstanceOf[T]) + + override def toString: String = getClass.getSimpleName } diff --git a/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala b/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala index 04e715e8c..7af9f5b94 100644 --- a/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala +++ b/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala @@ -1,6 +1,8 @@ package com.linkedin.feathr.common -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.avro.generic.IndexedRecord +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame /** * An extractor trait that provides APIs to transform a Spark GenericRowWithSchema into feature values @@ -12,12 +14,27 @@ trait SparkRowExtractor { * @param datum input row * @return list of feature keys */ - def getKeyFromRow(datum: GenericRowWithSchema): Seq[String] + def getKeyFromRow(datum: Any): Seq[String] /** * Get the feature value from the row * @param datum input row * @return A map of feature name to feature value */ - def getFeaturesFromRow(datum: GenericRowWithSchema): Map[String, FeatureValue] + def getFeaturesFromRow(datum: Any): Map[String, FeatureValue] + + /** + * Whether the Row extractor needs a one-time batch preprocessing before calling + * getKeyFromRow() and getFeaturesFromRow(). + * This is especially useful when users need to convert the source DataFrame + * into specific datatype, e.g. Avro GenericRecord or SpecificRecord. + */ + def hasBatchPreProcessing() = false + + /** + * One time batch preprocess the input data source into a RDD[_] for feature extraction later + * @param df input data source + * @return batch preprocessed dataframe, as RDD[IndexedRecord] + */ + def batchPreProcess(df: DataFrame) : RDD[IndexedRecord] = throw new NotImplementedError("Batch preprocess is not implemented") } \ No newline at end of file diff --git a/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/SimpleConfigurableAnchorExtractor.scala b/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/SimpleConfigurableAnchorExtractor.scala index 59f5bfbe7..edb2e2c06 100644 --- a/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/SimpleConfigurableAnchorExtractor.scala +++ b/src/main/scala/com/linkedin/feathr/offline/anchored/anchorExtractor/SimpleConfigurableAnchorExtractor.scala @@ -10,7 +10,6 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.mvel.{MvelContext, MvelUtils} import com.linkedin.feathr.offline.util.FeatureValueTypeValidator import org.apache.log4j.Logger -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types._ import org.mvel2.MVEL @@ -66,7 +65,7 @@ private[offline] class SimpleConfigurableAnchorExtractor( @JsonProperty("key") k * @param datum input row * @return list of feature keys */ - override def getKeyFromRow(datum: GenericRowWithSchema): Seq[String] = { + override def getKeyFromRow(datum: Any): Seq[String] = { getKey(datum.asInstanceOf[Any]) } @@ -107,7 +106,7 @@ private[offline] class SimpleConfigurableAnchorExtractor( @JsonProperty("key") k * @param row input row * @return A map of feature name to feature value */ - override def getFeaturesFromRow(row: GenericRowWithSchema) = { + override def getFeaturesFromRow(row: Any) = { getFeatures(row.asInstanceOf[Any]) } @@ -147,7 +146,7 @@ private[offline] class SimpleConfigurableAnchorExtractor( @JsonProperty("key") k featureTypeConfigs(featureRefStr) } val featureValue = offline.FeatureValue.fromTypeConfig(value, featureTypeConfig) - FeatureValueTypeValidator.validate(featureValue, featureTypeConfigs(featureRefStr)) + FeatureValueTypeValidator.validate(featureRefStr, featureValue, featureTypeConfigs(featureRefStr) ) (featureRefStr, featureValue) } diff --git a/src/main/scala/com/linkedin/feathr/offline/anchored/keyExtractor/MVELSourceKeyExtractor.scala b/src/main/scala/com/linkedin/feathr/offline/anchored/keyExtractor/MVELSourceKeyExtractor.scala index 209ac89e1..438a209b5 100644 --- a/src/main/scala/com/linkedin/feathr/offline/anchored/keyExtractor/MVELSourceKeyExtractor.scala +++ b/src/main/scala/com/linkedin/feathr/offline/anchored/keyExtractor/MVELSourceKeyExtractor.scala @@ -43,7 +43,7 @@ private[feathr] class MVELSourceKeyExtractor(val anchorExtractorV1: AnchorExtrac .toDF() } - def getKey(datum: GenericRowWithSchema): Seq[String] = { + def getKey(datum: Any): Seq[String] = { anchorExtractorV1.getKeyFromRow(datum) } @@ -55,7 +55,7 @@ private[feathr] class MVELSourceKeyExtractor(val anchorExtractorV1: AnchorExtrac */ override def getKeyColumnNames(datum: Option[Any]): Seq[String] = { if (datum.isDefined) { - val size = getKey(datum.get.asInstanceOf[GenericRowWithSchema]).size + val size = getKey(datum.get).size (1 to size).map(JOIN_KEY_PREFIX + _) } else { // return empty join key to signal empty dataset @@ -86,5 +86,6 @@ private[feathr] class MVELSourceKeyExtractor(val anchorExtractorV1: AnchorExtrac // this helps to reduce the number of joins // to the observation data // The default toString does not work, because toString of each object have different values - override def toString: String = getClass.getSimpleName + " with keyExprs:" + keyExprs.mkString(" key:") + override def toString: String = getClass.getSimpleName + " with keyExprs:" + keyExprs.mkString(" key:") + + anchorExtractorV1.toString } diff --git a/src/main/scala/com/linkedin/feathr/offline/config/FeathrConfigLoader.scala b/src/main/scala/com/linkedin/feathr/offline/config/FeathrConfigLoader.scala index 1faf0d814..e2ec6e588 100644 --- a/src/main/scala/com/linkedin/feathr/offline/config/FeathrConfigLoader.scala +++ b/src/main/scala/com/linkedin/feathr/offline/config/FeathrConfigLoader.scala @@ -327,7 +327,7 @@ private[offline] class AnchorLoader extends JsonDeserializer[FeatureAnchor] { case Some(tType) => offline.FeatureValue.fromTypeConfig(rawValue, tType) case None => offline.FeatureValue(rawValue, featureType, key) } - FeatureValueTypeValidator.validate(featureValue, featureTypeConfig) + FeatureValueTypeValidator.validate(featureValue, featureTypeConfig, key) (key, featureValue) } .toMap diff --git a/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala b/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala index 99436b93c..126128323 100644 --- a/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala +++ b/src/main/scala/com/linkedin/feathr/offline/generation/StreamingFeatureGenerator.scala @@ -6,7 +6,7 @@ import com.linkedin.feathr.common.JoiningFeatureParams import com.linkedin.feathr.offline.config.location.KafkaEndpoint import com.linkedin.feathr.offline.generation.outputProcessor.PushToRedisOutputProcessor.TABLE_PARAM_CONFIG_NAME import com.linkedin.feathr.offline.generation.outputProcessor.RedisOutputUtils -import com.linkedin.feathr.offline.job.FeatureTransformation.getFeatureJoinKey +import com.linkedin.feathr.offline.job.FeatureTransformation.getFeatureKeyColumnNames import com.linkedin.feathr.offline.job.{FeatureGenSpec, FeatureTransformation} import com.linkedin.feathr.offline.logical.FeatureGroups import com.linkedin.feathr.offline.source.accessor.DataPathHandler @@ -111,7 +111,7 @@ class StreamingFeatureGenerator(dataPathHandlers: List[DataPathHandler]) { // Apply feature transformation val transformedResult = DataFrameBasedSqlEvaluator.transform(anchor.featureAnchor.extractor.asInstanceOf[SimpleAnchorExtractorSpark], withKeyColumnDF, featureNamePrefixPairs, anchor.featureAnchor.featureTypeConfigs) - val outputJoinKeyColumnNames = getFeatureJoinKey(keyExtractor, withKeyColumnDF) + val outputJoinKeyColumnNames = getFeatureKeyColumnNames(keyExtractor, withKeyColumnDF) val selectedColumns = outputJoinKeyColumnNames ++ anchor.selectedFeatures.filter(keyTaggedFeatures.map(_.featureName).contains(_)) val cleanedDF = transformedResult.df.select(selectedColumns.head, selectedColumns.tail:_*) val keyColumnNames = FeatureTransformation.getStandardizedKeyNames(outputJoinKeyColumnNames.size) diff --git a/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala b/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala index 94de8e645..c223e93b6 100644 --- a/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala +++ b/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala @@ -1,7 +1,9 @@ package com.linkedin.feathr.offline.job -import com.linkedin.feathr.common._ import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrException, FeathrFeatureTransformationException} +import com.linkedin.feathr.common.tensor.TensorData +import com.linkedin.feathr.common.types.FeatureType +import com.linkedin.feathr.common.{AnchorExtractorBase, _} import com.linkedin.feathr.offline.anchored.anchorExtractor.{SQLConfigurableAnchorExtractor, SimpleConfigurableAnchorExtractor, TimeWindowConfigurableAnchorExtractor} import com.linkedin.feathr.offline.anchored.feature.{FeatureAnchor, FeatureAnchorWithSource} import com.linkedin.feathr.offline.anchored.keyExtractor.MVELSourceKeyExtractor @@ -22,6 +24,7 @@ import com.linkedin.feathr.offline.{FeatureDataFrame, JoinKeys} import com.linkedin.feathr.sparkcommon.{SimpleAnchorExtractorSpark, SourceKeyExtractor} import com.linkedin.feathr.swj.aggregate.AggregationType import com.linkedin.feathr.{common, offline} +import org.apache.avro.generic.IndexedRecord import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ @@ -75,7 +78,27 @@ private[offline] object FeatureTransformation { // feature name, column prefix type FeatureNameAndColumnPrefix = (String, String) - def getFeatureJoinKey(sourceKeyExtractor: SourceKeyExtractor, withKeyColumnDF: DataFrame, featureExtractor: Option[AnyRef] = None): Seq[String] = { + /** + * Extract feature key column names from the input feature RDD using the sourceKeyExtractor. + * @param sourceKeyExtractor key extractor that knows what are the key column in a feature RDD. + * @param withKeyColumnRDD RDD that contains the key columns. + * @return feature key column names + */ + def getFeatureKeyColumnNamesRdd(sourceKeyExtractor: SourceKeyExtractor, withKeyColumnRDD: RDD[_]): Seq[String] = { + if (withKeyColumnRDD.isEmpty) { + sourceKeyExtractor.getKeyColumnNames(None) + } else { + sourceKeyExtractor.getKeyColumnNames(Some(withKeyColumnRDD.first())) + } + } + + /** + * Extract feature key column names from the input feature DataFrame using the sourceKeyExtractor. + * @param sourceKeyExtractor key extractor that knows what are the key column in a feature RDD. + * @param withKeyColumnDF DataFrame that contains the key columns. + * @return feature key column names + */ + def getFeatureKeyColumnNames(sourceKeyExtractor: SourceKeyExtractor, withKeyColumnDF: DataFrame): Seq[String] = { if (withKeyColumnDF.head(1).isEmpty) { sourceKeyExtractor.getKeyColumnNames(None) } else { @@ -306,7 +329,8 @@ private[offline] object FeatureTransformation { } val withKeyColumnDF = keyExtractor.appendKeyColumns(sourceDF) - val outputJoinKeyColumnNames = getFeatureJoinKey(keyExtractor, withKeyColumnDF, Some(anchorFeatureGroup.anchorsWithSameSource.head.featureAnchor.extractor)) + + val outputJoinKeyColumnNames = getFeatureKeyColumnNames(keyExtractor, withKeyColumnDF) val filteredFactData = applyBloomFilter((keyExtractor, withKeyColumnDF), bloomFilter) // 1. apply all transformations on the dataframe in sequential order @@ -457,10 +481,23 @@ private[offline] object FeatureTransformation { val keyExtractor = anchorsWithSameSource.head._1.featureAnchor.sourceKeyExtractor val featureAnchorWithSource = anchorsWithSameSource.keys.toSeq val selectedFeatures = anchorsWithSameSource.flatMap(_._2.featureNames).toSeq - - val sourceDF = featureGroupingFactors.source - val transformedResults: Seq[KeyedTransformedResult] = transformMultiAnchorsOnSingleDataFrame(sourceDF, + val isAvroBasedExtractor = featureAnchorWithSource + .map(_.featureAnchor.extractor) + .filter(extractor => + extractor.isInstanceOf[AnchorExtractor[Any]] && + classOf[IndexedRecord].isAssignableFrom(extractor.asInstanceOf[AnchorExtractor[Any]].getInputType) + ).nonEmpty + val transformedResults: Seq[KeyedTransformedResult] = if (isAvroBasedExtractor) { + // If there are features are defined using AVRO record based extractor, run RDD based feature transformation + val sourceAccessor = featureGroupingFactors.source + val sourceRdd = sourceAccessor.asInstanceOf[NonTimeBasedDataSourceAccessor].get() + val featureTypeConfigs = featureAnchorWithSource.flatMap(featureAnchor => featureAnchor.featureAnchor.featureTypeConfigs).toMap + Seq(transformFeaturesOnAvroRecord(sourceRdd, keyExtractor, featureAnchorWithSource, bloomFilter, selectedFeatures, featureTypeConfigs)) + } else { + val sourceDF = featureGroupingFactors.source + transformFeaturesOnDataFrameRow(sourceDF, keyExtractor, featureAnchorWithSource, bloomFilter, selectedFeatures, incrementalAggContext, mvelContext) + } val res = transformedResults .map { transformedResultWithKey => @@ -673,6 +710,204 @@ private[offline] object FeatureTransformation { } } + + /** + * Apply a bloomfilter to a RDD + * + * @param keyExtractor key extractor to extract the key values from the RDD + * @param rdd RDD to filter + * @param bloomFilter bloomfilter used to filter out unwanted row in the RDD based on key columns + * @return filtered RDD + */ + + private def applyBloomFilterRdd(keyExtractor: SourceKeyExtractor, rdd: RDD[IndexedRecord], bloomFilter: Option[BloomFilter]): RDD[IndexedRecord] = { + bloomFilter match { + case None => + // no bloom filter, use data as it + rdd + case Some(filter) => + // get the list of join key columns or expression + keyExtractor match { + case extractor: MVELSourceKeyExtractor => + // get the list of join key columns or expression + val keyColumnsList = if (rdd.isEmpty) { + extractor.getKeyColumnNames(None) + } else { + extractor.getKeyColumnNames(Some(rdd.first)) + } + if (!keyColumnsList.isEmpty) { + val filtered = rdd.filter { record: Any => + val keyVals = extractor.getKey(record) + // if key is not in observation, skip it + if (keyVals != null && keyVals.count(_ == null) == 0) { + filter.mightContainString(SourceUtils.generateFilterKeyString(keyVals)) + } else { + false + } + } + filtered + } else { + // expand feature for seq join does not have right key, so we allow empty here + rdd + } + case _ => throw new FeathrFeatureTransformationException(ErrorLabel.FEATHR_USER_ERROR, "No source key extractor found") + } + } + } + + /** + * Transform features defined in a group of anchors based on same source + * This is for the AVRO record based extractors + * + * @param rdd source that requested features are defined on + * @param keyExtractor key extractor to apply on source rdd + * @param featureAnchorWithSources feature anchors defined on source rdd to be evaluated + * @param bloomFilter bloomfilter to apply on source rdd + * @param requestedFeatureNames requested features + * @param featureTypeConfigs user specified feature types + * @return TransformedResultWithKey + */ + private def transformFeaturesOnAvroRecord(df: DataFrame, + keyExtractor: SourceKeyExtractor, + featureAnchorWithSources: Seq[FeatureAnchorWithSource], + bloomFilter: Option[BloomFilter], + requestedFeatureNames: Seq[FeatureName], + featureTypeConfigs: Map[String, FeatureTypeConfig] = Map()): KeyedTransformedResult = { + if (!keyExtractor.isInstanceOf[MVELSourceKeyExtractor]) { + throw new FeathrException(ErrorLabel.FEATHR_ERROR, s"Error processing requested Feature :${requestedFeatureNames}. " + + s"Key extractor ${keyExtractor} must extends MVELSourceKeyExtractor.") + } + val extractor = keyExtractor.asInstanceOf[MVELSourceKeyExtractor] + if (!extractor.anchorExtractorV1.hasBatchPreProcessing()) { + throw new FeathrException(ErrorLabel.FEATHR_ERROR, s"Error processing requested Feature :${requestedFeatureNames}. " + + s"Missing batch preprocessors.") + } + val rdd = extractor.anchorExtractorV1.batchPreProcess(df) + val filteredFactData = applyBloomFilterRdd(keyExtractor, rdd, bloomFilter) + + // Build a sequence of 3-tuple of (FeatureAnchorWithSource, featureNamePrefixPairs, AnchorExtractorBase) + val transformInfo = featureAnchorWithSources map { featureAnchorWithSource => + val extractor = featureAnchorWithSource.featureAnchor.extractor + extractor match { + case transformer: AnchorExtractorBase[IndexedRecord] => + // We no longer need prefix for the simplicity of the implementation, instead if there's a feature name + // and source data field clash, we will throw exception and ask user to rename the feature. + val featureNamePrefix = "" + val featureNames = featureAnchorWithSource.selectedFeatures.filter(requestedFeatureNames.contains) + val featureNamePrefixPairs = featureNames.map((_, featureNamePrefix)) + (featureAnchorWithSource, featureNamePrefixPairs, transformer) + + case _ => + throw new FeathrFeatureTransformationException(ErrorLabel.FEATHR_USER_ERROR, s"Unsupported transformer $extractor for features: $requestedFeatureNames") + } + } + + // to avoid name conflict between feature names and the raw data field names + val sourceKeyExtractors = transformInfo.map(_._1.featureAnchor.sourceKeyExtractor) + assert(sourceKeyExtractors.map(_.toString).distinct.size == 1) + + val transformers = transformInfo map (_._3) + + /* + * Transform the given RDD by applying extractors to each row to create an RDD[Row] where each Row + * represents keys and feature values + */ + val spark = SparkSession.builder().getOrCreate() + val userProvidedFeatureTypes = transformInfo.flatMap(_._1.featureAnchor.getFeatureTypes.getOrElse(Map.empty[String, FeatureTypes])).toMap + val FeatureTypeInferenceContext(featureTypeAccumulators) = + FeatureTransformation.getTypeInferenceContext(spark, userProvidedFeatureTypes, requestedFeatureNames) + val transformedRdd = filteredFactData map { record => + val (keys, featureValuesWithType) = transformAvroRecord(requestedFeatureNames, sourceKeyExtractors, transformers, record, featureTypeConfigs) + requestedFeatureNames.zip(featureValuesWithType).foreach { + case (featureRef, (_, featureType)) => + if (featureTypeAccumulators(featureRef).isZero && featureType != null) { + // This is lazy evaluated + featureTypeAccumulators(featureRef).add(FeatureTypes.valueOf(featureType.getBasicType.toString)) + } + } + // Create a row by merging a row created from keys and a row created from term-vectors/tensors + Row.merge(Row.fromSeq(keys), Row.fromSeq(featureValuesWithType.map(_._1))) + } + + // Create a DataFrame from the above obtained RDD + val keyNames = getFeatureKeyColumnNamesRdd(sourceKeyExtractors.head, filteredFactData) + val (outputSchema, inferredFeatureTypeConfigs) = { + val allFeatureTypeConfigs = featureAnchorWithSources.flatMap(featureAnchorWithSource => featureAnchorWithSource.featureAnchor.featureTypeConfigs).toMap + val inferredFeatureTypes = inferFeatureTypes(featureTypeAccumulators, transformedRdd, requestedFeatureNames) + val inferredFeatureTypeConfigs = inferredFeatureTypes.map(x => x._1 -> new FeatureTypeConfig(x._2)) + val mergedFeatureTypeConfig = inferredFeatureTypeConfigs ++ allFeatureTypeConfigs + val colPrefix = "" + val featureTensorTypeInfo = getFDSSchemaFields(requestedFeatureNames, mergedFeatureTypeConfig, colPrefix) + val structFields = keyNames.foldRight(List.empty[StructField]) { + case (colName, acc) => + StructField(colName, StringType) :: acc + } + val outputSchema = StructType(StructType(structFields ++ featureTensorTypeInfo)) + (outputSchema, mergedFeatureTypeConfig) + } + val transformedDF = spark.createDataFrame(transformedRdd, outputSchema) + + val featureFormat = FeatureColumnFormat.FDS_TENSOR + val featureColumnFormats = requestedFeatureNames.map(name => name -> featureFormat).toMap + val transformedInfo = TransformedResult(transformInfo.flatMap(_._2), transformedDF, featureColumnFormats, inferredFeatureTypeConfigs) + KeyedTransformedResult(keyNames, transformedInfo) + } + + /** + * Apply a keyExtractor and feature transformer on a Record to extractor feature values. + * @param requestedFeatureNames requested feature names in the output. Extractors may produce more features than requested. + * @param sourceKeyExtractors extractor to extract the key from the record + * @param transformers transform to produce the feature value from the record + * @param record avro record to work on + * @param featureTypeConfigs user defined feature types + * @return tuple of (feature join key, sequence of (feature value, feature type) in the order of requestedFeatureNames) + */ + private def transformAvroRecord( + requestedFeatureNames: Seq[FeatureName], + sourceKeyExtractors: Seq[SourceKeyExtractor], + transformers: Seq[AnchorExtractorBase[IndexedRecord]], + record: IndexedRecord, + featureTypeConfigs: Map[String, FeatureTypeConfig] = Map()): (Seq[String], Seq[(Any, FeatureType)]) = { + val keys = sourceKeyExtractors.head match { + case mvelSourceKeyExtractor: MVELSourceKeyExtractor => mvelSourceKeyExtractor.getKey(record) + case _ => throw new FeathrFeatureTransformationException(ErrorLabel.FEATHR_USER_ERROR, s"${sourceKeyExtractors.head} is not a valid extractor on RDD") + } + + /* + * For the given row, apply all extractors to extract feature values. If requested as tensors, each feature value + * contains a tensor else a term-vector. + */ + val features = transformers map { + case extractor: AnchorExtractor[IndexedRecord] => + val features = extractor.getFeatures(record) + FeatureValueTypeValidator.validate(features, featureTypeConfigs) + features + case extractor => + throw new FeathrFeatureTransformationException( + ErrorLabel.FEATHR_USER_ERROR, + s"Invalid extractor $extractor for features:" + + s"$requestedFeatureNames requested as tensors") + } reduce (_ ++ _) + if (logger.isTraceEnabled) { + logger.trace(s"Extracted features: $features") + } + + /* + * Retain feature values for only the requested features, and represent each feature value as a term-vector or as + * a tensor, as specified. If tensors are required, create a row for each feature value (that is, the tensor). + */ + val featureValuesWithType = requestedFeatureNames map { name => + features.get(name) map { + case featureValue => + val tensorData: TensorData = featureValue.getAsTensorData() + val featureType: FeatureType = featureValue.getFeatureType() + val row = FeaturizedDatasetUtils.tensorToFDSDataFrameRow(tensorData) + (row, featureType) + } getOrElse ((null, null)) // return null if no feature value present + } + (keys, featureValuesWithType) + } + /** * Helper function to be used by groupFeatures. Given a collection of feature anchors which also contains information about grouping * criteria and extractor type per feature anchor, returns a map of FeatureGroupingCriteria to @@ -851,7 +1086,7 @@ private[offline] object FeatureTransformation { * others use direct aggregation * */ - private def transformMultiAnchorsOnSingleDataFrame( + private def transformFeaturesOnDataFrameRow( source: DataSourceAccessor, keyExtractor: SourceKeyExtractor, anchorsWithSameSource: Seq[FeatureAnchorWithSource], @@ -878,7 +1113,7 @@ private[offline] object FeatureTransformation { val incrAggCtx = incrementalAggContext.get val preAggDFs = incrAggCtx.previousSnapshotMap.collect { case (featureName, df) if requestedFeatures.exists(df.columns.contains) => df }.toSeq.distinct // join each previous aggregation dataframe sequentially - val groupKeys = getFeatureJoinKey(keyExtractor, preAggDFs.head) + val groupKeys = getFeatureKeyColumnNames(keyExtractor, preAggDFs.head) val keyColumnNames = getStandardizedKeyNames(groupKeys.size) val firstPreAgg = preAggDFs.head val joinedPreAggDFs = preAggDFs diff --git a/src/main/scala/com/linkedin/feathr/offline/transformation/DataFrameBasedRowEvaluator.scala b/src/main/scala/com/linkedin/feathr/offline/transformation/DataFrameBasedRowEvaluator.scala index d242372bf..cc6cba1c7 100644 --- a/src/main/scala/com/linkedin/feathr/offline/transformation/DataFrameBasedRowEvaluator.scala +++ b/src/main/scala/com/linkedin/feathr/offline/transformation/DataFrameBasedRowEvaluator.scala @@ -74,19 +74,20 @@ private[offline] object DataFrameBasedRowEvaluator { val featureTypes = featureTypeConfigs.mapValues(_.getFeatureType) val FeatureTypeInferenceContext(featureTypeAccumulators) = FeatureTransformation.getTypeInferenceContext(spark, featureTypes, featureRefStrs) + val transformedRdd = inputDF.rdd.map(row => { - // in some cases, the input dataframe row here only have Row and does not have schema attached, - // while MVEL only works with GenericRowWithSchema, create it manually - val rowWithSchema = if (row.isInstanceOf[GenericRowWithSchema]) { - row.asInstanceOf[GenericRowWithSchema] - } else { - new GenericRowWithSchema(row.toSeq.toArray, inputSchema) - } - if (rowExtractor.isInstanceOf[SimpleConfigurableAnchorExtractor]) { - rowExtractor.asInstanceOf[SimpleConfigurableAnchorExtractor].mvelContext = mvelContext - } - val result = rowExtractor.getFeaturesFromRow(rowWithSchema) - val featureValues = featureRefStrs map { + // in some cases, the input dataframe row here only have Row and does not have schema attached, + // while MVEL only works with GenericRowWithSchema, create it manually + val rowWithSchema = if (row.isInstanceOf[GenericRowWithSchema]) { + row.asInstanceOf[GenericRowWithSchema] + } else { + new GenericRowWithSchema(row.toSeq.toArray, inputSchema) + } + if (rowExtractor.isInstanceOf[SimpleConfigurableAnchorExtractor]) { + rowExtractor.asInstanceOf[SimpleConfigurableAnchorExtractor].mvelContext = mvelContext + } + val result = rowExtractor.getFeaturesFromRow(rowWithSchema) + val featureValues = featureRefStrs map { featureRef => if (result.contains(featureRef)) { val featureValue = result(featureRef) @@ -95,7 +96,7 @@ private[offline] object DataFrameBasedRowEvaluator { featureTypeAccumulators(featureRef).add(FeatureTypes.valueOf(rowFeatureType.toString)) } val tensorData: TensorData = featureValue.getAsTensorData() - FeaturizedDatasetUtils.tensorToDataFrameRow(tensorData) + FeaturizedDatasetUtils.tensorToFDSDataFrameRow(tensorData) } else null } Row.merge(row, Row.fromSeq(featureValues)) diff --git a/src/main/scala/com/linkedin/feathr/offline/transformation/DefaultValueSubstituter.scala b/src/main/scala/com/linkedin/feathr/offline/transformation/DefaultValueSubstituter.scala index 366967cc2..1b67d9558 100644 --- a/src/main/scala/com/linkedin/feathr/offline/transformation/DefaultValueSubstituter.scala +++ b/src/main/scala/com/linkedin/feathr/offline/transformation/DefaultValueSubstituter.scala @@ -112,7 +112,7 @@ private[offline] object DataFrameDefaultValueSubstituter extends DataFrameDefaul // For tensor default, since we don't have type, so we need to use expr to construct the default column val schema = field.dataType val tensorData = defaultFeatureValue.getAsTensorData - val ts = FeaturizedDatasetUtils.tensorToDataFrameRow(tensorData) + val ts = FeaturizedDatasetUtils.tensorToFDSDataFrameRow(tensorData) val fdsTensorDefaultUDF = getFDSTensorDefaultUDF(schema, ts) ss.udf.register("tz_udf", fdsTensorDefaultUDF) expr(s"tz_udf($featureColumnName)") diff --git a/src/main/scala/com/linkedin/feathr/offline/transformation/FDSConversionUtils.scala b/src/main/scala/com/linkedin/feathr/offline/transformation/FDSConversionUtils.scala index 824f48fe3..96a10a67c 100644 --- a/src/main/scala/com/linkedin/feathr/offline/transformation/FDSConversionUtils.scala +++ b/src/main/scala/com/linkedin/feathr/offline/transformation/FDSConversionUtils.scala @@ -37,7 +37,7 @@ private[offline] object FDSConversionUtils { // convert the "raw" input data into a FDS column a specific dataType rawFeatureValue match { case tensorData: TensorData => - FeaturizedDatasetUtils.tensorToDataFrameRow(tensorData, Some(targetDataType)) + FeaturizedDatasetUtils.tensorToFDSDataFrameRow(tensorData, Some(targetDataType)) case _ => targetDataType match { // Scalar tensor diff --git a/src/main/scala/com/linkedin/feathr/offline/util/FeatureValueTypeValidator.scala b/src/main/scala/com/linkedin/feathr/offline/util/FeatureValueTypeValidator.scala index ee06f3acd..aec0b1aea 100644 --- a/src/main/scala/com/linkedin/feathr/offline/util/FeatureValueTypeValidator.scala +++ b/src/main/scala/com/linkedin/feathr/offline/util/FeatureValueTypeValidator.scala @@ -16,7 +16,7 @@ private[offline] object FeatureValueTypeValidator { features.foreach { case (key, value) => featureTypeConfigs.get(key).foreach( - featureTypeConfig => FeatureValueTypeValidator.validate(value, featureTypeConfig)) + featureTypeConfig => FeatureValueTypeValidator.validate(key, value, featureTypeConfig)) } } @@ -27,9 +27,9 @@ private[offline] object FeatureValueTypeValidator { * @param featureValue value extracted from data * @param featureTypeConfig user-defined config, optional */ - def validate(featureValue: FeatureValue, featureTypeConfig: Option[FeatureTypeConfig]): Unit = { + def validate(featureValue: FeatureValue, featureTypeConfig: Option[FeatureTypeConfig], featureName: String): Unit = { featureTypeConfig match { - case Some(f) => validate(featureValue, f) + case Some(f) => validate(featureName, featureValue, f) case None => } } @@ -41,31 +41,31 @@ private[offline] object FeatureValueTypeValidator { * @param featureValue value extracted from data * @param featureTypeConfig user-defined config */ - def validate(featureValue: FeatureValue, featureTypeConfig: FeatureTypeConfig): Unit = { + def validate(featureName: String, featureValue: FeatureValue, featureTypeConfig: FeatureTypeConfig): Unit = { val configFeatureTypes = featureTypeConfig.getFeatureType val valueBasicType = featureValue.getFeatureType.getBasicType if (configFeatureTypes != FeatureTypes.UNSPECIFIED) { if (valueBasicType != FeatureType.BasicType.TENSOR || configFeatureTypes != FeatureTypes.TENSOR) { if (configFeatureTypes != FeatureTypes.valueOf(valueBasicType.name)) { - throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, "The FeatureValue type: " + valueBasicType - + " is not consistent with the type specified in the Feathr config: ." + configFeatureTypes); + throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, "The FeatureValue type of : " + featureName + + " is " + valueBasicType + ", which is not consistent with the type specified in the Feathr config: ." + configFeatureTypes); } } else if (featureTypeConfig.getTensorType != null) { val configTensorType = featureTypeConfig.getTensorType val valueTensorType = featureValue.getAsTypedTensor.getType if (configTensorType.getValueType != null && configTensorType.getValueType != valueTensorType.getValueType) { - throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, "The tensor value type: " + valueTensorType - + " is not consistent with the type specified in the Feathr config: ." + configTensorType); + throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, "The tensor value type of :" + featureName + + " is " + valueTensorType + ", which is not consistent with the type specified in the Feathr config: ." + configTensorType); } if (configTensorType.getTensorCategory != null && configTensorType.getTensorCategory != valueTensorType.getTensorCategory) { - throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, "The tensor category type: " + valueTensorType - + " is not consistent with the type specified in the Feathr config: ." + configTensorType); + throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, "The tensor category type of : " + featureName + " is " + + valueTensorType + ", which is not consistent with the type specified in the Feathr config: ." + configTensorType); } if (configTensorType.getDimensionTypes != null && configTensorType.getDimensionTypes != valueTensorType.getDimensionTypes) { - throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, "The tensor dimension type: " + valueTensorType - + " is not consistent with the type specified in the Feathr config: ." + configTensorType); + throw new FeathrException(ErrorLabel.FEATHR_USER_ERROR, "The tensor dimension type of : " + featureName + " is " + + valueTensorType + ", which is not consistent with the type specified in the Feathr config: ." + configTensorType); } } } diff --git a/src/main/scala/com/linkedin/feathr/offline/util/FeaturizedDatasetUtils.scala b/src/main/scala/com/linkedin/feathr/offline/util/FeaturizedDatasetUtils.scala index d672cf5f5..534881f7a 100644 --- a/src/main/scala/com/linkedin/feathr/offline/util/FeaturizedDatasetUtils.scala +++ b/src/main/scala/com/linkedin/feathr/offline/util/FeaturizedDatasetUtils.scala @@ -157,7 +157,7 @@ private[offline] object FeaturizedDatasetUtils { * @return the Quince-FDS struct or primitive */ - def tensorToDataFrameRow(tensor: TensorData, targetDataType: Option[DataType] = None): Any = { + def tensorToFDSDataFrameRow(tensor: TensorData, targetDataType: Option[DataType] = None): Any = { tensor match { case null => null case _ => diff --git a/src/test/scala/com/linkedin/feathr/offline/util/TestFeatureValueTypeValidator.scala b/src/test/scala/com/linkedin/feathr/offline/util/TestFeatureValueTypeValidator.scala index 1e9bae9b7..bda25b1cc 100644 --- a/src/test/scala/com/linkedin/feathr/offline/util/TestFeatureValueTypeValidator.scala +++ b/src/test/scala/com/linkedin/feathr/offline/util/TestFeatureValueTypeValidator.scala @@ -45,7 +45,7 @@ class TestFeatureValueTypeValidator extends TestFeathr { new FeatureValue(value, valueFeatureType.asInstanceOf[FeatureTypes]); } val featureTypeConfig = new FeatureTypeConfig(configFeatureTypes.asInstanceOf[FeatureTypes], configTensorType.asInstanceOf[TensorType], null) - FeatureValueTypeValidator.validate(featureValue, featureTypeConfig) + FeatureValueTypeValidator.validate("", featureValue, featureTypeConfig) } @DataProvider(name = "failTestCases") @@ -75,7 +75,7 @@ class TestFeatureValueTypeValidator extends TestFeathr { new FeatureValue(value, valueFeatureType.asInstanceOf[FeatureTypes]); } val featureTypeConfig = new FeatureTypeConfig(configFeatureTypes.asInstanceOf[FeatureTypes], configTensorType.asInstanceOf[TensorType], null) - FeatureValueTypeValidator.validate(featureValue, featureTypeConfig) + FeatureValueTypeValidator.validate("", featureValue, featureTypeConfig) } From 6991bf480817c37cc6e92d21dbaf120d04acd2a3 Mon Sep 17 00:00:00 2001 From: Jinghui Mo Date: Mon, 3 Oct 2022 21:13:33 -0400 Subject: [PATCH 2/4] Minor code refactor --- .../feathr/common/SparkRowExtractor.scala | 6 ++-- .../offline/job/FeatureTransformation.scala | 32 ++++++++++++------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala b/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala index 7af9f5b94..7664868cd 100644 --- a/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala +++ b/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala @@ -29,12 +29,12 @@ trait SparkRowExtractor { * This is especially useful when users need to convert the source DataFrame * into specific datatype, e.g. Avro GenericRecord or SpecificRecord. */ - def hasBatchPreProcessing() = false + def isLowLevelRddExtractor() = false /** - * One time batch preprocess the input data source into a RDD[_] for feature extraction later + * One time batch preprocess the input data source into a RDD[IndexedRecord] for feature extraction later * @param df input data source * @return batch preprocessed dataframe, as RDD[IndexedRecord] */ - def batchPreProcess(df: DataFrame) : RDD[IndexedRecord] = throw new NotImplementedError("Batch preprocess is not implemented") + def convertToAvroRdd(df: DataFrame) : RDD[IndexedRecord] = throw new NotImplementedError("Batch preprocess is not implemented") } \ No newline at end of file diff --git a/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala b/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala index c223e93b6..885d895b7 100644 --- a/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala +++ b/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala @@ -44,6 +44,16 @@ import scala.concurrent.{Await, ExecutionContext, Future} */ private[offline] case class AnchorFeatureGroups(anchorsWithSameSource: Seq[FeatureAnchorWithSource], requestedFeatures: Seq[String]) +/** + * Context info needed in feature transformation + * @param featureAnchorWithSource feature annchor with its source + * @param featureNamePrefixPairs map of feature name to its prefix + * @param transformer transformer of anchor + */ +private[offline] case class TransformInfo(featureAnchorWithSource: FeatureAnchorWithSource, + featureNamePrefixPairs: Seq[(FeatureName, FeatureName)], + transformer: AnchorExtractorBase[IndexedRecord]) + /** * Represent the transformed result of an anchor extractor after evaluating its features * @param featureNameAndPrefixPairs pairs of feature name and feature name prefix @@ -765,7 +775,7 @@ private[offline] object FeatureTransformation { * @param bloomFilter bloomfilter to apply on source rdd * @param requestedFeatureNames requested features * @param featureTypeConfigs user specified feature types - * @return TransformedResultWithKey + * @return TransformedResultWithKey The output feature DataFrame conforms to FDS format */ private def transformFeaturesOnAvroRecord(df: DataFrame, keyExtractor: SourceKeyExtractor, @@ -778,11 +788,11 @@ private[offline] object FeatureTransformation { s"Key extractor ${keyExtractor} must extends MVELSourceKeyExtractor.") } val extractor = keyExtractor.asInstanceOf[MVELSourceKeyExtractor] - if (!extractor.anchorExtractorV1.hasBatchPreProcessing()) { + if (!extractor.anchorExtractorV1.isLowLevelRddExtractor()) { throw new FeathrException(ErrorLabel.FEATHR_ERROR, s"Error processing requested Feature :${requestedFeatureNames}. " + - s"Missing batch preprocessors.") + s"isLowLevelRddExtractor() should return true and convertToAvroRdd should be implemented.") } - val rdd = extractor.anchorExtractorV1.batchPreProcess(df) + val rdd = extractor.anchorExtractorV1.convertToAvroRdd(df) val filteredFactData = applyBloomFilterRdd(keyExtractor, rdd, bloomFilter) // Build a sequence of 3-tuple of (FeatureAnchorWithSource, featureNamePrefixPairs, AnchorExtractorBase) @@ -795,7 +805,7 @@ private[offline] object FeatureTransformation { val featureNamePrefix = "" val featureNames = featureAnchorWithSource.selectedFeatures.filter(requestedFeatureNames.contains) val featureNamePrefixPairs = featureNames.map((_, featureNamePrefix)) - (featureAnchorWithSource, featureNamePrefixPairs, transformer) + TransformInfo(featureAnchorWithSource, featureNamePrefixPairs, transformer) case _ => throw new FeathrFeatureTransformationException(ErrorLabel.FEATHR_USER_ERROR, s"Unsupported transformer $extractor for features: $requestedFeatureNames") @@ -803,17 +813,17 @@ private[offline] object FeatureTransformation { } // to avoid name conflict between feature names and the raw data field names - val sourceKeyExtractors = transformInfo.map(_._1.featureAnchor.sourceKeyExtractor) + val sourceKeyExtractors = transformInfo.map(_.featureAnchorWithSource.featureAnchor.sourceKeyExtractor) assert(sourceKeyExtractors.map(_.toString).distinct.size == 1) - val transformers = transformInfo map (_._3) + val transformers = transformInfo map (_.transformer) /* * Transform the given RDD by applying extractors to each row to create an RDD[Row] where each Row * represents keys and feature values */ val spark = SparkSession.builder().getOrCreate() - val userProvidedFeatureTypes = transformInfo.flatMap(_._1.featureAnchor.getFeatureTypes.getOrElse(Map.empty[String, FeatureTypes])).toMap + val userProvidedFeatureTypes = transformInfo.flatMap(_.featureAnchorWithSource.featureAnchor.getFeatureTypes.getOrElse(Map.empty[String, FeatureTypes])).toMap val FeatureTypeInferenceContext(featureTypeAccumulators) = FeatureTransformation.getTypeInferenceContext(spark, userProvidedFeatureTypes, requestedFeatureNames) val transformedRdd = filteredFactData map { record => @@ -849,7 +859,7 @@ private[offline] object FeatureTransformation { val featureFormat = FeatureColumnFormat.FDS_TENSOR val featureColumnFormats = requestedFeatureNames.map(name => name -> featureFormat).toMap - val transformedInfo = TransformedResult(transformInfo.flatMap(_._2), transformedDF, featureColumnFormats, inferredFeatureTypeConfigs) + val transformedInfo = TransformedResult(transformInfo.flatMap(_.featureNamePrefixPairs), transformedDF, featureColumnFormats, inferredFeatureTypeConfigs) KeyedTransformedResult(keyNames, transformedInfo) } @@ -893,8 +903,8 @@ private[offline] object FeatureTransformation { } /* - * Retain feature values for only the requested features, and represent each feature value as a term-vector or as - * a tensor, as specified. If tensors are required, create a row for each feature value (that is, the tensor). + * Retain feature values for only the requested features, and represent each feature value as + * a tensor, as specified. */ val featureValuesWithType = requestedFeatureNames map { name => features.get(name) map { From c617cd6a3a1f61dec9c54d3cc6bec9c57e647f71 Mon Sep 17 00:00:00 2001 From: Jinghui Mo Date: Tue, 4 Oct 2022 16:53:36 -0400 Subject: [PATCH 3/4] Code refactor --- .../feathr/common/SparkRowExtractor.scala | 19 ------------------ .../feathr/common/WorkWithAvroRdd.scala | 20 +++++++++++++++++++ .../offline/job/FeatureTransformation.scala | 12 +++++------ 3 files changed, 25 insertions(+), 26 deletions(-) create mode 100644 src/main/scala/com/linkedin/feathr/common/WorkWithAvroRdd.scala diff --git a/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala b/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala index 7664868cd..ad088ac0a 100644 --- a/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala +++ b/src/main/scala/com/linkedin/feathr/common/SparkRowExtractor.scala @@ -1,9 +1,5 @@ package com.linkedin.feathr.common -import org.apache.avro.generic.IndexedRecord -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame - /** * An extractor trait that provides APIs to transform a Spark GenericRowWithSchema into feature values */ @@ -22,19 +18,4 @@ trait SparkRowExtractor { * @return A map of feature name to feature value */ def getFeaturesFromRow(datum: Any): Map[String, FeatureValue] - - /** - * Whether the Row extractor needs a one-time batch preprocessing before calling - * getKeyFromRow() and getFeaturesFromRow(). - * This is especially useful when users need to convert the source DataFrame - * into specific datatype, e.g. Avro GenericRecord or SpecificRecord. - */ - def isLowLevelRddExtractor() = false - - /** - * One time batch preprocess the input data source into a RDD[IndexedRecord] for feature extraction later - * @param df input data source - * @return batch preprocessed dataframe, as RDD[IndexedRecord] - */ - def convertToAvroRdd(df: DataFrame) : RDD[IndexedRecord] = throw new NotImplementedError("Batch preprocess is not implemented") } \ No newline at end of file diff --git a/src/main/scala/com/linkedin/feathr/common/WorkWithAvroRdd.scala b/src/main/scala/com/linkedin/feathr/common/WorkWithAvroRdd.scala new file mode 100644 index 000000000..d533dee9a --- /dev/null +++ b/src/main/scala/com/linkedin/feathr/common/WorkWithAvroRdd.scala @@ -0,0 +1,20 @@ +package com.linkedin.feathr.common + +import org.apache.avro.generic.IndexedRecord +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame + +/** + * If an AnchorExtractor only works on a Avro record, it should extends + * this trait, and use convertToAvroRdd to do a one-time batch conversion of DataFrame to RDD of their choice. + * convertToAvroRdd will be called by Feathr engine before calling getKeyFromRow() and getFeaturesFromRow() in AnchorExtractor. + */ +trait WorkWithAvroRdd { + + /** + * One time batch converting the input data source into a RDD[IndexedRecord] for feature extraction later + * @param df input data source + * @return batch preprocessed dataframe, as RDD[IndexedRecord] + */ + def convertToAvroRdd(df: DataFrame) : RDD[IndexedRecord] = throw new UnsupportedOperationException("Batch preprocess is not implemented") +} diff --git a/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala b/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala index 885d895b7..5b75c1ab7 100644 --- a/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala +++ b/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala @@ -491,13 +491,11 @@ private[offline] object FeatureTransformation { val keyExtractor = anchorsWithSameSource.head._1.featureAnchor.sourceKeyExtractor val featureAnchorWithSource = anchorsWithSameSource.keys.toSeq val selectedFeatures = anchorsWithSameSource.flatMap(_._2.featureNames).toSeq - val isAvroBasedExtractor = featureAnchorWithSource + val isAvroRddBasedExtractor = featureAnchorWithSource .map(_.featureAnchor.extractor) - .filter(extractor => - extractor.isInstanceOf[AnchorExtractor[Any]] && - classOf[IndexedRecord].isAssignableFrom(extractor.asInstanceOf[AnchorExtractor[Any]].getInputType) + .filter(extractor => extractor.isInstanceOf[WorkWithAvroRdd] ).nonEmpty - val transformedResults: Seq[KeyedTransformedResult] = if (isAvroBasedExtractor) { + val transformedResults: Seq[KeyedTransformedResult] = if (isAvroRddBasedExtractor) { // If there are features are defined using AVRO record based extractor, run RDD based feature transformation val sourceAccessor = featureGroupingFactors.source val sourceRdd = sourceAccessor.asInstanceOf[NonTimeBasedDataSourceAccessor].get() @@ -788,11 +786,11 @@ private[offline] object FeatureTransformation { s"Key extractor ${keyExtractor} must extends MVELSourceKeyExtractor.") } val extractor = keyExtractor.asInstanceOf[MVELSourceKeyExtractor] - if (!extractor.anchorExtractorV1.isLowLevelRddExtractor()) { + if (!extractor.anchorExtractorV1.isInstanceOf[WorkWithAvroRdd]) { throw new FeathrException(ErrorLabel.FEATHR_ERROR, s"Error processing requested Feature :${requestedFeatureNames}. " + s"isLowLevelRddExtractor() should return true and convertToAvroRdd should be implemented.") } - val rdd = extractor.anchorExtractorV1.convertToAvroRdd(df) + val rdd = extractor.anchorExtractorV1.asInstanceOf[WorkWithAvroRdd].convertToAvroRdd(df) val filteredFactData = applyBloomFilterRdd(keyExtractor, rdd, bloomFilter) // Build a sequence of 3-tuple of (FeatureAnchorWithSource, featureNamePrefixPairs, AnchorExtractorBase) From 4abe5429cb45a321a87586858ef84581055a9f26 Mon Sep 17 00:00:00 2001 From: Jinghui Mo Date: Wed, 5 Oct 2022 15:22:43 -0400 Subject: [PATCH 4/4] Rename a trait --- .../{WorkWithAvroRdd.scala => CanConvertToAvroRDD.scala} | 4 ++-- .../anchored/keyExtractor/MVELSourceKeyExtractor.scala | 2 +- .../linkedin/feathr/offline/job/FeatureTransformation.scala | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) rename src/main/scala/com/linkedin/feathr/common/{WorkWithAvroRdd.scala => CanConvertToAvroRDD.scala} (80%) diff --git a/src/main/scala/com/linkedin/feathr/common/WorkWithAvroRdd.scala b/src/main/scala/com/linkedin/feathr/common/CanConvertToAvroRDD.scala similarity index 80% rename from src/main/scala/com/linkedin/feathr/common/WorkWithAvroRdd.scala rename to src/main/scala/com/linkedin/feathr/common/CanConvertToAvroRDD.scala index d533dee9a..7051a308c 100644 --- a/src/main/scala/com/linkedin/feathr/common/WorkWithAvroRdd.scala +++ b/src/main/scala/com/linkedin/feathr/common/CanConvertToAvroRDD.scala @@ -9,12 +9,12 @@ import org.apache.spark.sql.DataFrame * this trait, and use convertToAvroRdd to do a one-time batch conversion of DataFrame to RDD of their choice. * convertToAvroRdd will be called by Feathr engine before calling getKeyFromRow() and getFeaturesFromRow() in AnchorExtractor. */ -trait WorkWithAvroRdd { +trait CanConvertToAvroRDD { /** * One time batch converting the input data source into a RDD[IndexedRecord] for feature extraction later * @param df input data source * @return batch preprocessed dataframe, as RDD[IndexedRecord] */ - def convertToAvroRdd(df: DataFrame) : RDD[IndexedRecord] = throw new UnsupportedOperationException("Batch preprocess is not implemented") + def convertToAvroRdd(df: DataFrame) : RDD[IndexedRecord] } diff --git a/src/main/scala/com/linkedin/feathr/offline/anchored/keyExtractor/MVELSourceKeyExtractor.scala b/src/main/scala/com/linkedin/feathr/offline/anchored/keyExtractor/MVELSourceKeyExtractor.scala index 438a209b5..bf5108e8b 100644 --- a/src/main/scala/com/linkedin/feathr/offline/anchored/keyExtractor/MVELSourceKeyExtractor.scala +++ b/src/main/scala/com/linkedin/feathr/offline/anchored/keyExtractor/MVELSourceKeyExtractor.scala @@ -87,5 +87,5 @@ private[feathr] class MVELSourceKeyExtractor(val anchorExtractorV1: AnchorExtrac // to the observation data // The default toString does not work, because toString of each object have different values override def toString: String = getClass.getSimpleName + " with keyExprs:" + keyExprs.mkString(" key:") + - anchorExtractorV1.toString + "anchorExtractor:" + anchorExtractorV1.toString } diff --git a/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala b/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala index 5b75c1ab7..7b106572b 100644 --- a/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala +++ b/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala @@ -493,7 +493,7 @@ private[offline] object FeatureTransformation { val selectedFeatures = anchorsWithSameSource.flatMap(_._2.featureNames).toSeq val isAvroRddBasedExtractor = featureAnchorWithSource .map(_.featureAnchor.extractor) - .filter(extractor => extractor.isInstanceOf[WorkWithAvroRdd] + .filter(extractor => extractor.isInstanceOf[CanConvertToAvroRDD] ).nonEmpty val transformedResults: Seq[KeyedTransformedResult] = if (isAvroRddBasedExtractor) { // If there are features are defined using AVRO record based extractor, run RDD based feature transformation @@ -786,11 +786,11 @@ private[offline] object FeatureTransformation { s"Key extractor ${keyExtractor} must extends MVELSourceKeyExtractor.") } val extractor = keyExtractor.asInstanceOf[MVELSourceKeyExtractor] - if (!extractor.anchorExtractorV1.isInstanceOf[WorkWithAvroRdd]) { + if (!extractor.anchorExtractorV1.isInstanceOf[CanConvertToAvroRDD]) { throw new FeathrException(ErrorLabel.FEATHR_ERROR, s"Error processing requested Feature :${requestedFeatureNames}. " + s"isLowLevelRddExtractor() should return true and convertToAvroRdd should be implemented.") } - val rdd = extractor.anchorExtractorV1.asInstanceOf[WorkWithAvroRdd].convertToAvroRdd(df) + val rdd = extractor.anchorExtractorV1.asInstanceOf[CanConvertToAvroRDD].convertToAvroRdd(df) val filteredFactData = applyBloomFilterRdd(keyExtractor, rdd, bloomFilter) // Build a sequence of 3-tuple of (FeatureAnchorWithSource, featureNamePrefixPairs, AnchorExtractorBase)