Skip to content

Commit

Permalink
refine logging message with detail unsplittable reason
Browse files Browse the repository at this point in the history
  • Loading branch information
WeichenXu123 committed Jul 17, 2019
1 parent 4ee25d6 commit 736587b
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 6 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ class HadoopRDD[K, V](
} else {
allInputSplits
}
if (inputSplits.length == 1) {
if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) {
val fileSplit = inputSplits(0).asInstanceOf[FileSplit]
val path = fileSplit.getPath
if (Utils.isFileSplittable(path, codecFactory)
&& fileSplit.getLength > conf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) {
logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " +
s"rdd partition have to deal with the whole file and consume large time.")
logWarning(s"Loading one large unsplittable File ${path.toString} with only one " +
s"partition, because the file is compressed by unsplittable compression codec.")
}
}
val array = new Array[Partition](inputSplits.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ abstract class FileScan(
false
}

def getFileUnSplittableReason(path: Path): String = {
"Unknown"
}

override def description(): String = {
val locationDesc =
fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]")
Expand Down Expand Up @@ -98,8 +102,8 @@ abstract class FileScan(
val path = new Path(splitFiles(0).filePath)
if (isSplitable(path) && splitFiles(0).length >
sparkSession.sparkContext.getConf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) {
logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " +
s"rdd partition have to deal with the whole file and consume large time.")
logWarning(s"Loading one large unsplittable File ${path.toString} with only one " +
s"partition, the reason is: ${getFileUnSplittableReason(path)}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ abstract class TextBasedFileScan(
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap))

override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory)

override def getFileUnSplittableReason(path: Path): String = {
if (!isSplitable(path)) {
"the file is compressed by unsplittable compression codec"
} else {
null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.csv.{CSVDataSource, MultiLineCSVDataSource}
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -50,6 +50,18 @@ case class CSVScan(
CSVDataSource(parsedOptions).isSplitable && super.isSplitable(path)
}

override def getFileUnSplittableReason(path: Path): String = {
if (!super.isSplitable(path)) {
super.getFileUnSplittableReason(path)
} else {
if (!CSVDataSource(parsedOptions).isSplitable) {
"the csv datasource is set multiLine mode"
} else {
""
}
}
}

override def createReaderFactory(): PartitionReaderFactory = {
// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ case class JsonScan(
JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path)
}

override def getFileUnSplittableReason(path: Path): String = {
if (!super.isSplitable(path)) {
super.getFileUnSplittableReason(path)
} else {
if (!JsonDataSource(parsedOptions).isSplitable) {
"the json datasource is set multiLine mode"
} else {
""
}
}
}

override def createReaderFactory(): PartitionReaderFactory = {
// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ case class TextScan(
super.isSplitable(path) && !textOptions.wholeText
}

override def getFileUnSplittableReason(path: Path): String = {
if (!isSplitable(path)) {
super.getFileUnSplittableReason(path)
} else {
if (textOptions.wholeText) {
"the text datasource is set wholetext mode"
} else {
null
}
}
}

override def createReaderFactory(): PartitionReaderFactory = {
assert(
readDataSchema.length <= 1,
Expand Down

0 comments on commit 736587b

Please sign in to comment.