Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delimiter option for reading CSV files for Feathr #307

Merged
merged 22 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/how-to-guides/feathr-job-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ Since Feathr uses Spark as the underlying execution engine, there's a way to ove
| ------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- |
| spark.feathr.inputFormat | None | Specify the input format if the file cannot be tell automatically. By default, Feathr will read files by parsing the file extension name; However the file/folder name doesn't have extension name, this configuration can be set to tell Feathr which format it should use to read the data. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to read delta lake. | 0.2.1 |
| spark.feathr.outputFormat | None | Specify the output format. "avro" is the default behavior if this value is not set. Currently can only be set for Spark built-in short names, including `json`, `parquet`, `jdbc`, `orc`, `libsvm`, `csv`, `text`. For more details, see ["Manually Specifying Options"](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#manually-specifying-options). Additionally, `delta` is also supported if users want to write delta lake. | 0.2.1 |
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
| spark.feathr.inputFormat.csvOptions.sep | None | Specify the delimiter. For example, "," for commas or "\t" for tabs. | 0.6.0 |
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: InputLocation
loadDataFrame(Map(), new JobConf(ss.sparkContext.hadoopConfiguration))
}

/**
* Convert string to special characters
ahlag marked this conversation as resolved.
Show resolved Hide resolved
* @return a String
ahlag marked this conversation as resolved.
Show resolved Hide resolved
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
ahlag marked this conversation as resolved.
Show resolved Hide resolved
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

/**
* load the source data as dataframe.
* @param dataIOParameters extra parameters
Expand All @@ -64,31 +73,39 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: InputLocation
val dataPath = location.getPath

log.info(s"Loading ${location} as DataFrame, using parameters ${dataIOParametersWithSplitSize}")

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved

try {
import scala.util.control.Breaks._
import scala.util.control.Breaks._

var dfOpt: Option[DataFrame] = None
breakable {
for(dataLoaderHandler <- dataLoaderHandlers) {
println(s"Applying dataLoaderHandler ${dataLoaderHandler}")
if (dataLoaderHandler.validatePath(dataPath)) {
dfOpt = Some(dataLoaderHandler.createDataFrame(dataPath, dataIOParametersWithSplitSize, jobConf))
break
}
var dfOpt: Option[DataFrame] = None
breakable {
for(dataLoaderHandler <- dataLoaderHandlers) {
println(s"Applying dataLoaderHandler ${dataLoaderHandler}")
if (dataLoaderHandler.validatePath(dataPath)) {
dfOpt = Some(dataLoaderHandler.createDataFrame(dataPath, dataIOParametersWithSplitSize, jobConf))
break
}
}
val df = dfOpt match {
case Some(df) => df
case _ => location.loadDf(ss, dataIOParametersWithSplitSize)
}
df
}
val df = dfOpt match {
case Some(df) => df
case _ => location.loadDf(ss, dataIOParametersWithSplitSize)
}
df
} catch {
case feathrException: FeathrInputDataException =>
println(feathrException.toString)
throw feathrException // Throwing exception to avoid dataLoaderHandler hook exception from being swallowed.
case e: Throwable => //TODO: Analyze all thrown exceptions, instead of swalling them all, and reading as a csv
println(e.toString)
ss.read.format("csv").option("header", "true").load(dataPath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,40 @@ private[offline] class CsvDataLoader(ss: SparkSession, path: String) extends Dat
doLoadCsvDocumentLikeAvro()._2
}

/**
* Convert string to special characters
* @return a String
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString.replaceAll("\"", "")
}


ahlag marked this conversation as resolved.
Show resolved Hide resolved
/**
* load the source data as dataframe.
* @return an dataframe
*/
override def loadDataFrame(): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption

try {
log.debug(s"Loading CSV path :${path}")
val absolutePath = new File(path).getPath
log.debug(s"Got absolute CSV path: ${absolutePath}, loading..")
ss.read.format("csv").option("header", "true").load(absolutePath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(absolutePath)
} catch {
case _: Throwable =>
log.debug(s"Loading CSV failed, retry with class loader..")
val absolutePath = getClass.getClassLoader.getResource(path).getPath
log.debug(s"Got absolution CSV path from class loader: ${absolutePath}, loading.. ")
ss.read.format("csv").option("header", "true").load(absolutePath)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(absolutePath)
}
}

Expand Down Expand Up @@ -101,4 +119,4 @@ private[offline] class CsvDataLoader(ss: SparkSession, path: String) extends Dat
private def getArbitraryRecordName(x: AnyRef): String = {
"AnonRecord_" + Integer.toHexString(x.hashCode)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ object FileFormat {

val DATA_FORMAT = "data.format"

/**
* Convert string to special characters
* @return a String
*/
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString.replaceAll("\"", "")
}

ahlag marked this conversation as resolved.
Show resolved Hide resolved
/**
* To define if the file is JDBC, Single File or Path list (default)
* @param path
Expand All @@ -51,9 +60,17 @@ object FileFormat {

// TODO: Complete a general loadDataFrame and replace current adhoc load data frame code
def loadDataFrame(ss: SparkSession, path: String, format: String = CSV): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption
ahlag marked this conversation as resolved.
Show resolved Hide resolved

format match {
case AVRO => new AvroJsonDataLoader(ss, path).loadDataFrame()
case CSV => ss.read.format("csv").option("header", "true").load(path)
case CSV => ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(path)
case PARQUET => new ParquetDataLoader(ss, path).loadDataFrame()
case _ => ???
}
Expand All @@ -69,23 +86,32 @@ object FileFormat {
val p = existingHdfsPaths.head.toLowerCase()
p match {
case p if p.endsWith(".csv") => CSV
case p if p.endsWith(".tsv") => CSV
ahlag marked this conversation as resolved.
Show resolved Hide resolved
case p if p.endsWith(".parquet") => PARQUET
case p if p.endsWith(".orc") => ORC
case p if p.endsWith(".avro.json") => AVRO_JSON
case p if p.endsWith(".avro") => AVRO
case p if p.startsWith("jdbc:") => JDBC
case _ =>
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase
}


}

def loadHdfsDataFrame(format: String, existingHdfsPaths: Seq[String]): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep
val sqlContext = ss.sqlContext
// Get rawCsvDelimiterOption from spark.feathr.inputFormat.csvOptions.sep
val rawCsvDelimiterOption = sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")
// If rawCsvDelimiterOption is not properly set, defaults to "," as the delimiter else csvDelimiterOption
val csvDelimiterOption = if (escape(rawCsvDelimiterOption).trim.isEmpty) "," else rawCsvDelimiterOption

val df = format match {
case CSV =>
ss.read.format("csv").option("header", "true").load(existingHdfsPaths: _*)
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(existingHdfsPaths: _*)
case AVRO =>
ss.read.format(AVRO_DATASOURCE).load(existingHdfsPaths: _*)
case ORC =>
Expand All @@ -102,4 +128,4 @@ object FileFormat {
}
df
}
}
}
Loading