Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delimiter option for reading CSV files for Feathr #307

Merged
merged 22 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/how-to-guides/feathr-job-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ parent: How-to Guides

Since Feathr uses Spark as the underlying execution engine, there's a way to override Spark configuration by `FeathrClient.get_offline_features()` with `execution_configurations` parameters. The complete list of the available spark configuration is located in [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html) (though not all of those are honored for cloud hosted Spark platforms such as Databricks), and there are a few Feathr specific ones that are documented here:

| Property Name | Default | Meaning | Since Version |
| ------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- |
| Property Name | Default | Meaning | Since Version |
| ------------------------- | ------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------- |
| spark.feathr.inputFormat | None | Specify the input format if the file cannot be tell automatically. By default, Feathr will read files by parsing the file extension name; However the file/folder name doesn't have extension name, this configuration can be set to tell Feathr which format it should use to read the data. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to read delta lake. | 0.2.1 |
| spark.feathr.outputFormat | None | Specify the output format. "avro" is the default behavior if this value is not set. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to write delta lake. | 0.2.1 |
| spark.feathr.outputFormat | None | Specify the output format. "avro" is the default behavior if this value is not set. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to write delta lake. | 0.2.1 |
| spark.feathr.inputFormat.csvOptions.sep | None | Specify the delimiter. For example, "," for commas or "\t" for tabs. (Supports both csv and tsv) | 0.6.0 |
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrInputDataExceptio
import com.linkedin.feathr.offline.config.location.DataLocation
import com.linkedin.feathr.offline.generation.SparkIOUtils
import com.linkedin.feathr.offline.job.DataSourceUtils.getSchemaFromAvroDataFile
import com.linkedin.feathr.offline.source.dataloader.jdbc.JdbcUtils
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption
import org.apache.avro.Schema
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.JobConf
Expand Down Expand Up @@ -64,31 +64,35 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
val dataPath = location.getPath

log.info(s"Loading ${location} as DataFrame, using parameters ${dataIOParametersWithSplitSize}")

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSV)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

try {
import scala.util.control.Breaks._
import scala.util.control.Breaks._

var dfOpt: Option[DataFrame] = None
breakable {
for(dataLoaderHandler <- dataLoaderHandlers) {
println(s"Applying dataLoaderHandler ${dataLoaderHandler}")
if (dataLoaderHandler.validatePath(dataPath)) {
dfOpt = Some(dataLoaderHandler.createDataFrame(dataPath, dataIOParametersWithSplitSize, jobConf))
break
}
var dfOpt: Option[DataFrame] = None
breakable {
for(dataLoaderHandler <- dataLoaderHandlers) {
println(s"Applying dataLoaderHandler ${dataLoaderHandler}")
if (dataLoaderHandler.validatePath(dataPath)) {
dfOpt = Some(dataLoaderHandler.createDataFrame(dataPath, dataIOParametersWithSplitSize, jobConf))
break
}
}
val df = dfOpt match {
case Some(df) => df
case _ => location.loadDf(ss, dataIOParametersWithSplitSize)
}
df
}
val df = dfOpt match {
case Some(df) => df
case _ => location.loadDf(ss, dataIOParametersWithSplitSize)
}
df
} catch {
case feathrException: FeathrInputDataException =>
println(feathrException.toString)
throw feathrException // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed.
case e: Throwable => //TODO: Analyze all thrown exceptions, instead of swalling them all, and reading as a csv
println(e.toString)
ss.read.format("csv").option("header", "true").load(dataPath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File
import scala.collection.JavaConverters._
import scala.collection.convert.wrapAll._
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption

import scala.io.Source

/**
Expand All @@ -32,17 +34,21 @@ private[offline] class CsvDataLoader(ss: SparkSession, path: String) extends Dat
* @return an dataframe
*/
override def loadDataFrame(): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSV)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

try {
log.debug(s"Loading CSV path :${path}")
val absolutePath = new File(path).getPath
log.debug(s"Got absolute CSV path: ${absolutePath}, loading..")
ss.read.format("csv").option("header", "true").load(absolutePath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(absolutePath)
} catch {
case _: Throwable =>
log.debug(s"Loading CSV failed, retry with class loader..")
val absolutePath = getClass.getClassLoader.getResource(path).getPath
log.debug(s"Got absolution CSV path from class loader: ${absolutePath}, loading.. ")
ss.read.format("csv").option("header", "true").load(absolutePath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(absolutePath)
}
}

Expand Down Expand Up @@ -101,4 +107,4 @@ private[offline] class CsvDataLoader(ss: SparkSession, path: String) extends Dat
private def getArbitraryRecordName(x: AnyRef): String = {
"AnonRecord_" + Integer.toHexString(x.hashCode)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.linkedin.feathr.offline.source.dataloader._
import com.linkedin.feathr.offline.source.dataloader.jdbc.JdbcUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import com.linkedin.feathr.offline.util.DelimiterUtils.checkDelimiterOption


object FileFormat {
Expand Down Expand Up @@ -51,9 +52,13 @@ object FileFormat {

// TODO: Complete a general loadDataFrame and replace current adhoc load data frame code
def loadDataFrame(ss: SparkSession, path: String, format: String = CSV): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSV)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

format match {
case AVRO => new AvroJsonDataLoader(ss, path).loadDataFrame()
case CSV => ss.read.format("csv").option("header", "true").load(path)
case CSV => ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(path)
case PARQUET => new ParquetDataLoader(ss, path).loadDataFrame()
case _ => ???
}
Expand All @@ -69,23 +74,29 @@ object FileFormat {
val p = existingHdfsPaths.head.toLowerCase()
p match {
case p if p.endsWith(".csv") => CSV
// Tab-separated Format will be treated as CSV (Enum) here but with tab as the delimiter
case p if p.endsWith(".tsv") => CSV
ahlag marked this conversation as resolved.
Show resolved Hide resolved
case p if p.endsWith(".parquet") => PARQUET
case p if p.endsWith(".orc") => ORC
case p if p.endsWith(".avro.json") => AVRO_JSON
case p if p.endsWith(".avro") => AVRO
case p if p.startsWith("jdbc:") => JDBC
case _ =>
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase
}


}

def loadHdfsDataFrame(format: String, existingHdfsPaths: Seq[String]): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSV)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

val df = format match {
case CSV =>
ss.read.format("csv").option("header", "true").load(existingHdfsPaths: _*)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(existingHdfsPaths: _*)
case AVRO =>
ss.read.format(AVRO_DATASOURCE).load(existingHdfsPaths: _*)
case ORC =>
Expand All @@ -102,4 +113,4 @@ object FileFormat {
}
df
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.linkedin.feathr.offline.util
import scala.reflect.runtime.universe.{Literal, Constant}

object DelimiterUtils {

/**
* Convert delimiter to an escape character (e.g. " " -> "\t")
*/
def escape(raw: String): String = {
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

/**
* If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
*/
def checkDelimiterOption(csvDelimiterOption: String): String = {
if (escape(csvDelimiterOption).trim.isEmpty) "," else csvDelimiterOption
}

}
Loading