-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3954][Streaming] Optimization to FileInputDStream #2811
Conversation
Can one of the admins verify this patch? |
@@ -27,6 +27,7 @@ import org.apache.spark.rdd.RDD | |||
import org.apache.spark.rdd.UnionRDD | |||
import org.apache.spark.streaming.{StreamingContext, Time} | |||
import org.apache.spark.util.TimeStampedHashMap | |||
import scala.collection.mutable.ArrayBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does your modification use ArrayBuffer, seems it is not used.
Looks good to me about the improvement. |
Besides would you mind creating a related JIRA and change the title like other PR. |
Can one of the admins verify this patch? |
Does someone take notice of this patch? |
@jerryshao :Is this a inessential patch? why no manager to merge-commit? |
Maybe they are quite busy, let me ping @tdas . |
@tdas |
@@ -120,14 +120,14 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas | |||
|
|||
/** Generate one RDD from an array of files */ | |||
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { | |||
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) | |||
files.zip(fileRDDs).foreach { case (file, rdd) => { | |||
val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exceeds 100 columns.
Does this actually improve performance? |
I dont see any reference to file-size.foreach in the patch. which line are you referring to. Were you dealing with so many files that this proved to be a problem? Could post some bench mark numbers on what is the improvement in speed? |
I think this PR is more concentrate on logical optimization, not speed. Spark used three iterations to get the fileRDD list which is not necessary. |
@tdas |
Bench mark:
|
Its not a big deal but its a good code update nonetheless. However, i am not sure
Mind updating the code to use this style? |
[files.map(file=>{})]
Jenkins, this is ok to test. |
Test build #23173 has started for PR 2811 at commit
|
Test build #23173 has finished for PR 2811 at commit
|
Test PASSed. |
Thanks, I have merged this! |
about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1.files.map(...) 2.files.zip(fileRDDs) 3.files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence => only one loop Author: surq <[email protected]> Closes #2811 from surq/SPARK-3954 and squashes the following commits: 321bbe8 [surq] updated the code style.The style from [for...yield]to [files.map(file=>{})] 88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into SPARK-3954 178066f [surq] modify code's style. [Exceeds 100 columns] 626ef97 [surq] remove redundant import(ArrayBuffer) 739341f [surq] promote the speed of convert files to RDDS (cherry picked from commit ce6ed2a) Signed-off-by: Tathagata Das <[email protected]>
about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1.files.map(...) 2.files.zip(fileRDDs) 3.files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence => only one loop Author: surq <[email protected]> Closes #2811 from surq/SPARK-3954 and squashes the following commits: 321bbe8 [surq] updated the code style.The style from [for...yield]to [files.map(file=>{})] 88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into SPARK-3954 178066f [surq] modify code's style. [Exceeds 100 columns] 626ef97 [surq] remove redundant import(ArrayBuffer) 739341f [surq] promote the speed of convert files to RDDS (cherry picked from commit ce6ed2a) Signed-off-by: Tathagata Das <[email protected]>
about convert files to RDDS there are 3 loops with files sequence in spark source.
loops files sequence:
1.files.map(...)
2.files.zip(fileRDDs)
3.files-size.foreach
It's will very time consuming when lots of files.So I do the following correction:
3 loops with files sequence => only one loop