diff --git a/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala b/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala index a813032f9..93004a99f 100644 --- a/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala +++ b/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala @@ -5,7 +5,7 @@ import com.linkedin.feathr.offline.config.location.InputLocation import com.linkedin.feathr.offline.generation.SparkIOUtils import com.linkedin.feathr.offline.job.DataSourceUtils.getSchemaFromAvroDataFile import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler -import com.linkedin.feathr.offline.util.DelimiterUtils.escape +import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption import org.apache.avro.Schema import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.JobConf @@ -65,12 +65,8 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: InputLocation log.info(s"Loading ${location} as DataFrame, using parameters ${dataIOParametersWithSplitSize}") - // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep - val sqlContext = ss.sqlContext - // Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep - val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",") - // If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption - val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption + // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv) + val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")) try { import scala.util.control.Breaks._ diff --git a/src/main/scala/com/linkedin/feathr/offline/source/dataloader/CsvDataLoader.scala b/src/main/scala/com/linkedin/feathr/offline/source/dataloader/CsvDataLoader.scala index d7d56e305..8ea87262a 100644 --- a/src/main/scala/com/linkedin/feathr/offline/source/dataloader/CsvDataLoader.scala +++ b/src/main/scala/com/linkedin/feathr/offline/source/dataloader/CsvDataLoader.scala @@ -10,6 +10,8 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import java.io.File import scala.collection.JavaConverters._ import scala.collection.convert.wrapAll._ +import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption + import scala.io.Source /** @@ -27,27 +29,14 @@ private[offline] class CsvDataLoader(ss: SparkSession, path: String) extends Dat doLoadCsvDocumentLikeAvro()._2 } - /** - * Convert delimiter to an escape character (e.g. " " -> "\t") - */ - def escape(raw: String): String = { - import scala.reflect.runtime.universe.{Literal, Constant} - Literal(Constant(raw)).toString.replaceAll("\"", "") - } - - /** * load the source data as dataframe. * @return an dataframe */ override def loadDataFrame(): DataFrame = { - // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep - val sqlContext = ss.sqlContext - // Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep - val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",") - // If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption - val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption + // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv) + val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")) try { log.debug(s"Loading CSV path :${path}") diff --git a/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala b/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala index bf8575105..89e2066d7 100644 --- a/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala +++ b/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala @@ -5,6 +5,7 @@ import com.linkedin.feathr.offline.source.dataloader._ import com.linkedin.feathr.offline.source.dataloader.jdbc.JdbcUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrame +import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption object FileFormat { @@ -29,14 +30,6 @@ object FileFormat { val DATA_FORMAT = "data.format" - /** - * Convert delimiter to an escape character (e.g. " " -> "\t") - */ - def escape(raw: String): String = { - import scala.reflect.runtime.universe.{Literal, Constant} - Literal(Constant(raw)).toString.replaceAll("\"", "") - } - /** * To define if the file is JDBC, Single File or Path list (default) * @param path @@ -60,12 +53,8 @@ object FileFormat { // TODO: Complete a general loadDataFrame and replace current adhoc load data frame code def loadDataFrame(ss: SparkSession, path: String, format: String = CSV): DataFrame = { - // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep - val sqlContext = ss.sqlContext - // Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep - val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",") - // If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption - val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption + // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv) + val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")) format match { case AVRO => new AvroJsonDataLoader(ss, path).loadDataFrame() @@ -102,12 +91,8 @@ object FileFormat { def loadHdfsDataFrame(format: String, existingHdfsPaths: Seq[String]): DataFrame = { - // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep - val sqlContext = ss.sqlContext - // Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep - val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",") - // If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption - val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption + // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv) + val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")) val df = format match { case CSV => diff --git a/src/main/scala/com/linkedin/feathr/offline/util/DelimiterUtils.scala b/src/main/scala/com/linkedin/feathr/offline/util/DelimiterUtils.scala index ce9e2cacd..bdf247acf 100644 --- a/src/main/scala/com/linkedin/feathr/offline/util/DelimiterUtils.scala +++ b/src/main/scala/com/linkedin/feathr/offline/util/DelimiterUtils.scala @@ -1,4 +1,5 @@ package com.linkedin.feathr.offline.util +import scala.reflect.runtime.universe.{Literal, Constant} object DelimiterUtils { @@ -6,8 +7,14 @@ object DelimiterUtils { * Convert delimiter to an escape character (e.g. " " -> "\t") */ def escape(raw: String): String = { - import scala.reflect.runtime.universe.{Literal, Constant} Literal(Constant(raw)).toString.replaceAll("\"", "") } + /** + * If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption + */ + def checkDelimiterOption(csvDelimiterOption: String): String = { + if (escape(csvDelimiterOption).trim.isEmpty) "," else csvDelimiterOption + } + } diff --git a/src/main/scala/com/linkedin/feathr/offline/util/SourceUtils.scala b/src/main/scala/com/linkedin/feathr/offline/util/SourceUtils.scala index 41c124968..9729bc1e9 100644 --- a/src/main/scala/com/linkedin/feathr/offline/util/SourceUtils.scala +++ b/src/main/scala/com/linkedin/feathr/offline/util/SourceUtils.scala @@ -51,6 +51,7 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.reflect.ClassTag import scala.util.Try +import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption /** * Load "raw" not-yet-featurized data from HDFS data sets @@ -65,14 +66,6 @@ private[offline] object SourceUtils { val FEATURE_MP_DEF_CONFIG_SUFFIX = ".conf" val firstRecordName = "topLevelRecord" - /** - * Convert delimiter to an escape character (e.g. " " -> "\t") - */ - def escape(raw: String): String = { - import scala.reflect.runtime.universe.{Literal, Constant} - Literal(Constant(raw)).toString.replaceAll("\"", "") - } - /** * get AVRO datum type of a dataset we should use to load, * it is determined by the expect datatype from a set of anchor transformers @@ -674,12 +667,8 @@ private[offline] object SourceUtils { val format = FileFormat.getType(inputData.inputPath) log.info(s"loading ${inputData.inputPath} input Path as Format: ${format}") - // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep - val sqlContext = ss.sqlContext - // Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep - val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",") - // If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption - val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption + // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSv) + val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")) format match { case FileFormat.PATHLIST => {