Skip to content

Commit

Permalink
Refactored utils
Browse files Browse the repository at this point in the history
Signed-off-by: changyonglik <[email protected]>
  • Loading branch information
ahlag committed Aug 4, 2022
1 parent 52eb35f commit 2d41211
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.linkedin.feathr.offline.config.location.InputLocation
import com.linkedin.feathr.offline.generation.SparkIOUtils
import com.linkedin.feathr.offline.job.DataSourceUtils.getSchemaFromAvroDataFile
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.util.DelimiterUtils.escape
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption
import org.apache.avro.Schema
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.JobConf
Expand Down Expand Up @@ -65,12 +65,8 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: InputLocation

log.info(s"Loading ${location} as DataFrame, using parameters ${dataIOParametersWithSplitSize}")

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption
// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

try {
import scala.util.control.Breaks._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File
import scala.collection.JavaConverters._
import scala.collection.convert.wrapAll._
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption

import scala.io.Source

/**
Expand All @@ -27,27 +29,14 @@ private[offline] class CsvDataLoader(ss: SparkSession, path: String) extends Dat
doLoadCsvDocumentLikeAvro()._2
}

/**
* Convert delimiter to an escape character (e.g. " " -> "\t")
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe.{Literal, Constant}
Literal(Constant(raw)).toString.replaceAll("\"", "")
}


/**
* load the source data as dataframe.
* @return an dataframe
*/
override def loadDataFrame(): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption
// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

try {
log.debug(s"Loading CSV path :${path}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.linkedin.feathr.offline.source.dataloader._
import com.linkedin.feathr.offline.source.dataloader.jdbc.JdbcUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption


object FileFormat {
Expand All @@ -29,14 +30,6 @@ object FileFormat {

val DATA_FORMAT = "data.format"

/**
* Convert delimiter to an escape character (e.g. " " -> "\t")
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe.{Literal, Constant}
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

/**
* To define if the file is JDBC, Single File or Path list (default)
* @param path
Expand All @@ -60,12 +53,8 @@ object FileFormat {
// TODO: Complete a general loadDataFrame and replace current adhoc load data frame code
def loadDataFrame(ss: SparkSession, path: String, format: String = CSV): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption
// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

format match {
case AVRO => new AvroJsonDataLoader(ss, path).loadDataFrame()
Expand Down Expand Up @@ -102,12 +91,8 @@ object FileFormat {

def loadHdfsDataFrame(format: String, existingHdfsPaths: Seq[String]): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption
// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

val df = format match {
case CSV =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package com.linkedin.feathr.offline.util
import scala.reflect.runtime.universe.{Literal, Constant}

object DelimiterUtils {

/**
* Convert delimiter to an escape character (e.g. " " -> "\t")
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe.{Literal, Constant}
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

/**
* If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
*/
def checkDelimiterOption(csvDelimiterOption: String): String = {
if (escape(csvDelimiterOption).trim.isEmpty) "," else csvDelimiterOption
}

}
17 changes: 3 additions & 14 deletions src/main/scala/com/linkedin/feathr/offline/util/SourceUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import scala.collection.JavaConverters._
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.Try
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption

/**
* Load "raw" not-yet-featurized data from HDFS data sets
Expand All @@ -65,14 +66,6 @@ private[offline] object SourceUtils {
val FEATURE_MP_DEF_CONFIG_SUFFIX = ".conf"
val firstRecordName = "topLevelRecord"

/**
* Convert delimiter to an escape character (e.g. " " -> "\t")
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe.{Literal, Constant}
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

/**
* get AVRO datum type of a dataset we should use to load,
* it is determined by the expect datatype from a set of anchor transformers
Expand Down Expand Up @@ -674,12 +667,8 @@ private[offline] object SourceUtils {
val format = FileFormat.getType(inputData.inputPath)
log.info(s"loading ${inputData.inputPath} input Path as Format: ${format}")

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption
// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

format match {
case FileFormat.PATHLIST => {
Expand Down

0 comments on commit 2d41211

Please sign in to comment.