Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28366][CORE] Logging in driver when loading single large unsplittable file #25134

Closed
wants to merge 13 commits into from

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Jul 12, 2019

What changes were proposed in this pull request?

Logging in driver when loading single large unsplittable file via sc.textFile or csv/json datasouce.
Current condition triggering logging is

  • only generate one partition
  • file is unsplittable, possible reason is:
    • compressed by unsplittable compression algo such as gzip.
    • multiLine mode in csv/json datasource
    • wholeText mode in text datasource
  • file size exceed the config threshold spark.io.warning.largeFileThreshold (default value is 1GB)

How was this patch tested?

Manually test.
Generate one gzip file exceeding 1GB,

base64 -b 50 /dev/urandom | head -c 2000000000 > file1.txt
cat file1.txt | gzip > file1.gz

then launch spark-shell,

run

sc.textFile("file:///path/to/file1.gz").count()

Will print log like:

WARN HadoopRDD: Loading one large unsplittable file file:/.../f1.gz with only one partition, because the file is compressed by unsplittable compression codec

run

sc.textFile("file:///path/to/file1.txt").count()

Will print log like:

WARN HadoopRDD: Loading one large file file:/.../f1.gz with only one partition, we can increase partition numbers by the `minPartitions` argument in method `sc.textFile

run

spark.read.csv("file:///path/to/file1.gz").count

Will print log like:

WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the file is compressed by unsplittable compression codec

run

spark.read.option("multiLine", true).csv("file:///path/to/file1.gz").count

Will print log like:

WARN CSVScan: Loading one large unsplittable file file:/.../f1.gz with only one partition, the reason is: the csv datasource is set multiLine mode

JSON and Text datasource also tested with similar cases.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment.

@SparkQA
Copy link

SparkQA commented Jul 12, 2019

Test build #107594 has finished for PR 25134 at commit 34a9a25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that it's pretty commonly known that unsplittable codec isn't able to be proceeded in parallel.

Spark itself writes multiple files from each partition so Spark users won't meet this an issue often arguably. So, this logging mostly applies the case when we read a big file from external source.

If we should add the logging here, we should add warning here and there. For instance, multiLine option for CSV and JSON too.

@WeichenXu123
Copy link
Contributor Author

@HyukjinKwon Yea, but some users complain that loading large unsplittable file without logging make them confusing...

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107749 has finished for PR 25134 at commit b48ced1.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107751 has finished for PR 25134 at commit 4ee25d6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123
Copy link
Contributor Author

@HyukjinKwon Now I handled all the cases which file is unsplittable.

@WeichenXu123 WeichenXu123 changed the title [SPARK-28366][CORE] Logging in driver when loading single large unsplittable file via sc.textFile [SPARK-28366][CORE] Logging in driver when loading single large unsplittable file Jul 17, 2019
@SparkQA
Copy link

SparkQA commented Jul 17, 2019

Test build #107768 has finished for PR 25134 at commit 736587b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 18, 2019

Test build #107812 has finished for PR 25134 at commit 3da440b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) {
val fileSplit = inputSplits(0).asInstanceOf[FileSplit]
val path = fileSplit.getPath
if (Utils.isFileSplittable(path, codecFactory)
Copy link
Contributor

@cloud-fan cloud-fan Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to know if it's splittable or not? If Spark is scanning files with a single giant partition, it's going to be very slow.

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Yes. But we'd better tell user why it only generate only one partition. So I prefer:

  • If the file is unsplittable, then in log tell user the file is unsplittable (and include unsplittable reason)
  • If the file is splittable, then in log tell user we can increase parallelism by setting the argument minPartitions in method sc.textFile.

What do you think ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Any thoughts ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, let's include the reason in the message.

@SparkQA
Copy link

SparkQA commented Jul 18, 2019

Test build #107840 has finished for PR 25134 at commit 0c2ce85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jul 23, 2019

Test build #108025 has finished for PR 25134 at commit 0c2ce85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2019

Test build #108029 has finished for PR 25134 at commit e6cf714.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2019

Test build #108030 has finished for PR 25134 at commit feb8dd0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108361 has finished for PR 25134 at commit 4ce0d33.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108370 has finished for PR 25134 at commit 4ce0d33.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108374 has finished for PR 25134 at commit 4ce0d33.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108393 has finished for PR 25134 at commit 9442948.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (!isSplitable(path)) {
Some("the file is compressed by unsplittable compression codec")
} else {
None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will we hit this branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove all branch return None and add assert.

@SparkQA
Copy link

SparkQA commented Jul 31, 2019

Test build #108476 has finished for PR 25134 at commit 801c6e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 26d03b6 Aug 1, 2019
* If a file with `path` is unsplittable, return the unsplittable reason,
* otherwise return `None`.
*/
def getFileUnSplittableReason(path: Path): String = {
Copy link
Member

@HyukjinKwon HyukjinKwon Aug 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, is it really worth to expose another internal API in our common source trait?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have isSplittable and it makes sense to explain why it's unsplittable. Maybe there is a way to merge these 2 methods, but I can't think of one now.

.doc("When spark loading one single large file, if file size exceed this " +
"threshold, then log warning with possible reasons.")
.longConf
.createWithDefault(1024 * 1024 * 1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's worth adding a config. It looks an overkill.

Copy link
Contributor

@cloud-fan cloud-fan Aug 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. this is an internal config.
  2. "large file" is vague, and I don't think we can hardcode a value and say that's "large file".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is, this warning stuff is trivial and not important actually.

We can just pick any reasonable number. Who will configure this? I won't do that. This information shouldn't be job-based, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

people may set it to Long.Max to disable the warning. Besides, an internal config doesn't hurt. I think we have many internal configs that users will never set.

if (Utils.isFileSplittable(path, codecFactory)) {
logWarning(s"Loading one large file ${path.toString} with only one partition, " +
s"we can increase partition numbers by the `minPartitions` argument in method " +
"`sc.textFile`")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always sc.textFile? Many datasource V1 implementation still uses hadoopFile or newHadoopFile often.

if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
val codecFactory = new CompressionCodecFactory(jobConf)
if (Utils.isFileSplittable(path, codecFactory)) {
logWarning(s"Loading one large file ${path.toString} with only one partition, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit toString won't be needed since here's string interpolation.

val codecFactory = new CompressionCodecFactory(jobConf)
if (Utils.isFileSplittable(path, codecFactory)) {
logWarning(s"Loading one large file ${path.toString} with only one partition, " +
s"we can increase partition numbers by the `minPartitions` argument in method " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: and s isn't needed too

@HyukjinKwon
Copy link
Member

@cloud-fan, this looks overkill. Can we simply mention it in DataFrame(Reader|Writer)/DataStream(Reader|Writer) for our datasources?

For Hadoop ones, hadoop input format or somewhere else should describe it.

@cloud-fan
Copy link
Contributor

even if we document it, how can they know what's the codec of the data files? It's better to give a warning before running a foreseeable long job.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 2, 2019

They will specify it in compression when they write out some data from Spark and the codec is detected via file extension. We can just simply leave a note for a warning. This info is rather a static one than job-based, and users will likely know what code was used for their files.

private[spark] val IO_WARNING_LARGEFILETHRESHOLD =
ConfigBuilder("spark.io.warning.largeFileThreshold")
.internal()
.doc("When spark loading one single large file, if file size exceed this " +
Copy link
Member

@gatorsmile gatorsmile Nov 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the description to

If the size in bytes of a file loaded by Spark exceeds this threshold, a warning is logged with the possible reasons.

.internal()
.doc("When spark loading one single large file, if file size exceed this " +
"threshold, then log warning with possible reasons.")
.longConf
Copy link
Member

@gatorsmile gatorsmile Nov 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update it to .bytesConf(ByteUnit.BYTE)

@gatorsmile
Copy link
Member

cc @Ngone51 @cloud-fan

cloud-fan pushed a commit that referenced this pull request Nov 27, 2019
…HRESHOLD

### What changes were proposed in this pull request?

Improve conf `IO_WARNING_LARGEFILETHRESHOLD` (a.k.a `spark.io.warning.largeFileThreshold`):

* reword documentation

* change type from `long` to `bytes`

### Why are the changes needed?

Improvements according to #25134 (comment) & #25134 (comment).

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass Jenkins.

Closes #26691 from Ngone51/SPARK-28366-followup.

Authored-by: wuyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Dec 6, 2019
…HRESHOLD

### What changes were proposed in this pull request?

Improve conf `IO_WARNING_LARGEFILETHRESHOLD` (a.k.a `spark.io.warning.largeFileThreshold`):

* reword documentation

* change type from `long` to `bytes`

### Why are the changes needed?

Improvements according to apache#25134 (comment) & apache#25134 (comment).

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass Jenkins.

Closes apache#26691 from Ngone51/SPARK-28366-followup.

Authored-by: wuyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants