diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7c332fdb85721..b55412b3c3ae7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1180,6 +1180,14 @@ package object config { .intConf .createWithDefault(1) + private[spark] val IO_WARNING_LARGEFILETHRESHOLD = + ConfigBuilder("spark.io.warning.largeFileThreshold") + .internal() + .doc("When spark loading one single large file, if file size exceed this " + + "threshold, then log warning with possible reasons.") + .longConf + .createWithDefault(1024 * 1024 * 1024) + private[spark] val EVENT_LOG_COMPRESSION_CODEC = ConfigBuilder("spark.eventLog.compression.codec") .doc("The codec used to compress event log. By default, Spark provides four codecs: " + diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 3974580cfaa11..eea3c697cf219 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -25,6 +25,7 @@ import scala.collection.immutable.Map import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.mapred._ import org.apache.hadoop.mapred.lib.CombineFileSplit import org.apache.hadoop.mapreduce.TaskType @@ -40,7 +41,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager} +import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils} /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -207,6 +208,21 @@ class HadoopRDD[K, V]( } else { allInputSplits } + if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { + val fileSplit = inputSplits(0).asInstanceOf[FileSplit] + val path = fileSplit.getPath + if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) { + val codecFactory = new CompressionCodecFactory(jobConf) + if (Utils.isFileSplittable(path, codecFactory)) { + logWarning(s"Loading one large file ${path.toString} with only one partition, " + + s"we can increase partition numbers by the `minPartitions` argument in method " + + "`sc.textFile`") + } else { + logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + + s"partition, because the file is compressed by unsplittable compression codec.") + } + } + } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 80d70a1d48504..faacb14419fc9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -51,6 +51,7 @@ import com.google.common.net.InetAddresses import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.eclipse.jetty.util.MultiException @@ -2895,6 +2896,12 @@ private[spark] object Utils extends Logging { def isLocalUri(uri: String): Boolean = { uri.startsWith(s"$LOCAL_SCHEME:") } + + /** Check whether the file of the path is splittable. */ + def isFileSplittable(path: Path, codecFactory: CompressionCodecFactory): Boolean = { + val codec = codecFactory.getCodec(path) + codec == null || codec.isInstanceOf[SplittableCompressionCodec] + } } private[util] object CallerContext extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index b2f3c4d256448..0438bd0430da1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -21,22 +21,24 @@ import java.util.{Locale, OptionalLong} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils abstract class FileScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, readDataSchema: StructType, - readPartitionSchema: StructType) extends Scan with Batch with SupportsReportStatistics { + readPartitionSchema: StructType) + extends Scan + with Batch with SupportsReportStatistics with Logging { /** * Returns whether a file with `path` could be split or not. */ @@ -44,6 +46,15 @@ abstract class FileScan( false } + /** + * If a file with `path` is unsplittable, return the unsplittable reason, + * otherwise return `None`. + */ + def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) + "undefined" + } + override def description(): String = { val locationDesc = fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") @@ -91,6 +102,16 @@ abstract class FileScan( ) }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) } + + if (splitFiles.length == 1) { + val path = new Path(splitFiles(0).filePath) + if (!isSplitable(path) && splitFiles(0).length > + sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) { + logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + + s"partition, the reason is: ${getFileUnSplittableReason(path)}") + } + } + FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index d6b84dcdfd15d..7ddd99a0293b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} +import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils abstract class TextBasedFileScan( sparkSession: SparkSession, @@ -33,14 +34,13 @@ abstract class TextBasedFileScan( readPartitionSchema: StructType, options: CaseInsensitiveStringMap) extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) { - private var codecFactory: CompressionCodecFactory = _ + @transient private lazy val codecFactory: CompressionCodecFactory = new CompressionCodecFactory( + sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)) - override def isSplitable(path: Path): Boolean = { - if (codecFactory == null) { - codecFactory = new CompressionCodecFactory( - sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)) - } - val codec = codecFactory.getCodec(path) - codec == null || codec.isInstanceOf[SplittableCompressionCodec] + override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory) + + override def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) + "the file is compressed by unsplittable compression codec" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 5bc8029b4068a..3cbcfca01a9c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex -import org.apache.spark.sql.execution.datasources.csv.CSVDataSource +import org.apache.spark.sql.execution.datasources.csv.{CSVDataSource, MultiLineCSVDataSource} import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory import org.apache.spark.sql.types.{DataType, StructType} @@ -50,6 +50,15 @@ case class CSVScan( CSVDataSource(parsedOptions).isSplitable && super.isSplitable(path) } + override def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) + if (!super.isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + "the csv datasource is set multiLine mode" + } + } + override def createReaderFactory(): PartitionReaderFactory = { // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 201572b4338b6..5c41bbd931982 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -50,6 +50,15 @@ case class JsonScan( JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path) } + override def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) + if (!super.isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + "the json datasource is set multiLine mode" + } + } + override def createReaderFactory(): PartitionReaderFactory = { // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala index 202723db27421..89b0511442d4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala @@ -44,6 +44,15 @@ case class TextScan( super.isSplitable(path) && !textOptions.wholeText } + override def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) + if (!super.isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + "the text datasource is set wholetext mode" + } + } + override def createReaderFactory(): PartitionReaderFactory = { assert( readDataSchema.length <= 1,