-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28366][CORE] Logging in driver when loading single large unsplittable file #25134
Changes from all commits
34a9a25
b48ced1
fafba7b
4ee25d6
736587b
3da440b
0c2ce85
e6cf714
feb8dd0
597114f
4ce0d33
9442948
801c6e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1180,6 +1180,14 @@ package object config { | |
.intConf | ||
.createWithDefault(1) | ||
|
||
private[spark] val IO_WARNING_LARGEFILETHRESHOLD = | ||
ConfigBuilder("spark.io.warning.largeFileThreshold") | ||
.internal() | ||
.doc("When spark loading one single large file, if file size exceed this " + | ||
"threshold, then log warning with possible reasons.") | ||
.longConf | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update it to |
||
.createWithDefault(1024 * 1024 * 1024) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it's worth adding a config. It looks an overkill. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is, this warning stuff is trivial and not important actually. We can just pick any reasonable number. Who will configure this? I won't do that. This information shouldn't be job-based, too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. people may set it to Long.Max to disable the warning. Besides, an internal config doesn't hurt. I think we have many internal configs that users will never set. |
||
|
||
private[spark] val EVENT_LOG_COMPRESSION_CODEC = | ||
ConfigBuilder("spark.eventLog.compression.codec") | ||
.doc("The codec used to compress event log. By default, Spark provides four codecs: " + | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import scala.collection.immutable.Map | |
import scala.reflect.ClassTag | ||
|
||
import org.apache.hadoop.conf.{Configurable, Configuration} | ||
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} | ||
import org.apache.hadoop.mapred._ | ||
import org.apache.hadoop.mapred.lib.CombineFileSplit | ||
import org.apache.hadoop.mapreduce.TaskType | ||
|
@@ -40,7 +41,7 @@ import org.apache.spark.internal.config._ | |
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD | ||
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} | ||
import org.apache.spark.storage.StorageLevel | ||
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager} | ||
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils} | ||
|
||
/** | ||
* A Spark split class that wraps around a Hadoop InputSplit. | ||
|
@@ -207,6 +208,21 @@ class HadoopRDD[K, V]( | |
} else { | ||
allInputSplits | ||
} | ||
if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { | ||
val fileSplit = inputSplits(0).asInstanceOf[FileSplit] | ||
val path = fileSplit.getPath | ||
if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) { | ||
val codecFactory = new CompressionCodecFactory(jobConf) | ||
if (Utils.isFileSplittable(path, codecFactory)) { | ||
logWarning(s"Loading one large file ${path.toString} with only one partition, " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit |
||
s"we can increase partition numbers by the `minPartitions` argument in method " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: and |
||
"`sc.textFile`") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it always |
||
} else { | ||
logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + | ||
s"partition, because the file is compressed by unsplittable compression codec.") | ||
} | ||
} | ||
} | ||
val array = new Array[Partition](inputSplits.size) | ||
for (i <- 0 until inputSplits.size) { | ||
array(i) = new HadoopPartition(id, i, inputSplits(i)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,29 +21,40 @@ import java.util.{Locale, OptionalLong} | |
import org.apache.commons.lang3.StringUtils | ||
import org.apache.hadoop.fs.Path | ||
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD | ||
import org.apache.spark.sql.{AnalysisException, SparkSession} | ||
import org.apache.spark.sql.catalyst.expressions.AttributeReference | ||
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection | ||
import org.apache.spark.sql.execution.PartitionedFileUtil | ||
import org.apache.spark.sql.execution.datasources._ | ||
import org.apache.spark.sql.sources.Filter | ||
import org.apache.spark.sql.sources.v2.reader._ | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
import org.apache.spark.util.Utils | ||
|
||
abstract class FileScan( | ||
sparkSession: SparkSession, | ||
fileIndex: PartitioningAwareFileIndex, | ||
readDataSchema: StructType, | ||
readPartitionSchema: StructType) extends Scan with Batch with SupportsReportStatistics { | ||
readPartitionSchema: StructType) | ||
extends Scan | ||
with Batch with SupportsReportStatistics with Logging { | ||
/** | ||
* Returns whether a file with `path` could be split or not. | ||
*/ | ||
def isSplitable(path: Path): Boolean = { | ||
false | ||
} | ||
|
||
/** | ||
* If a file with `path` is unsplittable, return the unsplittable reason, | ||
* otherwise return `None`. | ||
*/ | ||
def getFileUnSplittableReason(path: Path): String = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan, is it really worth to expose another internal API in our common source trait? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have |
||
assert(!isSplitable(path)) | ||
"undefined" | ||
} | ||
|
||
override def description(): String = { | ||
val locationDesc = | ||
fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") | ||
|
@@ -91,6 +102,16 @@ abstract class FileScan( | |
) | ||
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) | ||
} | ||
|
||
if (splitFiles.length == 1) { | ||
val path = new Path(splitFiles(0).filePath) | ||
if (!isSplitable(path) && splitFiles(0).length > | ||
sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) { | ||
logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + | ||
s"partition, the reason is: ${getFileUnSplittableReason(path)}") | ||
} | ||
} | ||
|
||
FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the description to