diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 7e1df97a9f9fd..c763eabe8abba 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -210,13 +210,13 @@ class HadoopRDD[K, V]( } else { allInputSplits } - if (inputSplits.length == 1) { + if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { val fileSplit = inputSplits(0).asInstanceOf[FileSplit] val path = fileSplit.getPath if (Utils.isFileSplittable(path, codecFactory) && fileSplit.getLength > conf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) { - logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " + - s"rdd partition have to deal with the whole file and consume large time.") + logWarning(s"Loading one large unsplittable File ${path.toString} with only one " + + s"partition, because the file is compressed by unsplittable compression codec.") } } val array = new Array[Partition](inputSplits.size) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index d2e6731eb533a..c8a9a7fd23c2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -46,6 +46,10 @@ abstract class FileScan( false } + def getFileUnSplittableReason(path: Path): String = { + "Unknown" + } + override def description(): String = { val locationDesc = fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") @@ -98,8 +102,8 @@ abstract class FileScan( val path = new Path(splitFiles(0).filePath) if (isSplitable(path) && splitFiles(0).length > sparkSession.sparkContext.getConf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) { - logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " + - s"rdd partition have to deal with the whole file and consume large time.") + logWarning(s"Loading one large unsplittable File ${path.toString} with only one " + + s"partition, the reason is: ${getFileUnSplittableReason(path)}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index e85af891e3e11..6271be9de1fda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -38,4 +38,12 @@ abstract class TextBasedFileScan( sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)) override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory) + + override def getFileUnSplittableReason(path: Path): String = { + if (!isSplitable(path)) { + "the file is compressed by unsplittable compression codec" + } else { + null + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 5bc8029b4068a..71cb45842446b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex -import org.apache.spark.sql.execution.datasources.csv.CSVDataSource +import org.apache.spark.sql.execution.datasources.csv.{CSVDataSource, MultiLineCSVDataSource} import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory import org.apache.spark.sql.types.{DataType, StructType} @@ -50,6 +50,18 @@ case class CSVScan( CSVDataSource(parsedOptions).isSplitable && super.isSplitable(path) } + override def getFileUnSplittableReason(path: Path): String = { + if (!super.isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + if (!CSVDataSource(parsedOptions).isSplitable) { + "the csv datasource is set multiLine mode" + } else { + "" + } + } + } + override def createReaderFactory(): PartitionReaderFactory = { // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 201572b4338b6..c4a7f5f8ddc1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -50,6 +50,18 @@ case class JsonScan( JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path) } + override def getFileUnSplittableReason(path: Path): String = { + if (!super.isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + if (!JsonDataSource(parsedOptions).isSplitable) { + "the json datasource is set multiLine mode" + } else { + "" + } + } + } + override def createReaderFactory(): PartitionReaderFactory = { // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala index 202723db27421..1f9bf78ad7f5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala @@ -44,6 +44,18 @@ case class TextScan( super.isSplitable(path) && !textOptions.wholeText } + override def getFileUnSplittableReason(path: Path): String = { + if (!isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + if (textOptions.wholeText) { + "the text datasource is set wholetext mode" + } else { + null + } + } + } + override def createReaderFactory(): PartitionReaderFactory = { assert( readDataSchema.length <= 1,