Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28366][CORE] Logging in driver when loading single large unsplittable file #25134

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,14 @@ package object config {
.intConf
.createWithDefault(1)

private[spark] val IO_WARNING_LARGEFILETHRESHOLD =
ConfigBuilder("spark.io.warning.largeFileThreshold")
.internal()
.doc("When spark loading one single large file, if file size exceed this " +
Copy link
Member

@gatorsmile gatorsmile Nov 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the description to

If the size in bytes of a file loaded by Spark exceeds this threshold, a warning is logged with the possible reasons.

"threshold, then log warning with possible reasons.")
.longConf
Copy link
Member

@gatorsmile gatorsmile Nov 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update it to .bytesConf(ByteUnit.BYTE)

.createWithDefault(1024 * 1024 * 1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's worth adding a config. It looks an overkill.

Copy link
Contributor

@cloud-fan cloud-fan Aug 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. this is an internal config.
  2. "large file" is vague, and I don't think we can hardcode a value and say that's "large file".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is, this warning stuff is trivial and not important actually.

We can just pick any reasonable number. Who will configure this? I won't do that. This information shouldn't be job-based, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

people may set it to Long.Max to disable the warning. Besides, an internal config doesn't hurt. I think we have many internal configs that users will never set.


private[spark] val EVENT_LOG_COMPRESSION_CODEC =
ConfigBuilder("spark.eventLog.compression.codec")
.doc("The codec used to compress event log. By default, Spark provides four codecs: " +
Expand Down
18 changes: 17 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.immutable.Map
import scala.reflect.ClassTag

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.mapreduce.TaskType
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager}
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils}

/**
* A Spark split class that wraps around a Hadoop InputSplit.
Expand Down Expand Up @@ -207,6 +208,21 @@ class HadoopRDD[K, V](
} else {
allInputSplits
}
if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) {
val fileSplit = inputSplits(0).asInstanceOf[FileSplit]
val path = fileSplit.getPath
if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
val codecFactory = new CompressionCodecFactory(jobConf)
if (Utils.isFileSplittable(path, codecFactory)) {
logWarning(s"Loading one large file ${path.toString} with only one partition, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit toString won't be needed since here's string interpolation.

s"we can increase partition numbers by the `minPartitions` argument in method " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: and s isn't needed too

"`sc.textFile`")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always sc.textFile? Many datasource V1 implementation still uses hadoopFile or newHadoopFile often.

} else {
logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
s"partition, because the file is compressed by unsplittable compression codec.")
}
}
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import com.google.common.net.InetAddresses
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.eclipse.jetty.util.MultiException
Expand Down Expand Up @@ -2895,6 +2896,12 @@ private[spark] object Utils extends Logging {
def isLocalUri(uri: String): Boolean = {
uri.startsWith(s"$LOCAL_SCHEME:")
}

/** Check whether the file of the path is splittable. */
def isFileSplittable(path: Path, codecFactory: CompressionCodecFactory): Boolean = {
val codec = codecFactory.getCodec(path)
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
}
}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,40 @@ import java.util.{Locale, OptionalLong}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

abstract class FileScan(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType) extends Scan with Batch with SupportsReportStatistics {
readPartitionSchema: StructType)
extends Scan
with Batch with SupportsReportStatistics with Logging {
/**
* Returns whether a file with `path` could be split or not.
*/
def isSplitable(path: Path): Boolean = {
false
}

/**
* If a file with `path` is unsplittable, return the unsplittable reason,
* otherwise return `None`.
*/
def getFileUnSplittableReason(path: Path): String = {
Copy link
Member

@HyukjinKwon HyukjinKwon Aug 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, is it really worth to expose another internal API in our common source trait?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have isSplittable and it makes sense to explain why it's unsplittable. Maybe there is a way to merge these 2 methods, but I can't think of one now.

assert(!isSplitable(path))
"undefined"
}

override def description(): String = {
val locationDesc =
fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]")
Expand Down Expand Up @@ -91,6 +102,16 @@ abstract class FileScan(
)
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
}

if (splitFiles.length == 1) {
val path = new Path(splitFiles(0).filePath)
if (!isSplitable(path) && splitFiles(0).length >
sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
s"partition, the reason is: ${getFileUnSplittableReason(path)}")
}
}

FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.io.compress.CompressionCodecFactory

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

abstract class TextBasedFileScan(
sparkSession: SparkSession,
Expand All @@ -33,14 +34,13 @@ abstract class TextBasedFileScan(
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap)
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
private var codecFactory: CompressionCodecFactory = _
@transient private lazy val codecFactory: CompressionCodecFactory = new CompressionCodecFactory(
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap))

override def isSplitable(path: Path): Boolean = {
if (codecFactory == null) {
codecFactory = new CompressionCodecFactory(
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap))
}
val codec = codecFactory.getCodec(path)
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory)

override def getFileUnSplittableReason(path: Path): String = {
assert(!isSplitable(path))
"the file is compressed by unsplittable compression codec"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.csv.{CSVDataSource, MultiLineCSVDataSource}
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -50,6 +50,15 @@ case class CSVScan(
CSVDataSource(parsedOptions).isSplitable && super.isSplitable(path)
}

override def getFileUnSplittableReason(path: Path): String = {
assert(!isSplitable(path))
if (!super.isSplitable(path)) {
super.getFileUnSplittableReason(path)
} else {
"the csv datasource is set multiLine mode"
}
}

override def createReaderFactory(): PartitionReaderFactory = {
// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ case class JsonScan(
JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path)
}

override def getFileUnSplittableReason(path: Path): String = {
assert(!isSplitable(path))
if (!super.isSplitable(path)) {
super.getFileUnSplittableReason(path)
} else {
"the json datasource is set multiLine mode"
}
}

override def createReaderFactory(): PartitionReaderFactory = {
// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ case class TextScan(
super.isSplitable(path) && !textOptions.wholeText
}

override def getFileUnSplittableReason(path: Path): String = {
assert(!isSplitable(path))
if (!super.isSplitable(path)) {
super.getFileUnSplittableReason(path)
} else {
"the text datasource is set wholetext mode"
}
}

override def createReaderFactory(): PartitionReaderFactory = {
assert(
readDataSchema.length <= 1,
Expand Down