Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3954][Streaming] Optimization to FileInputDStream #2811

Closed
wants to merge 5 commits into from

Conversation

surq
Copy link

@surq surq commented Oct 15, 2014

about convert files to RDDS there are 3 loops with files sequence in spark source.
loops files sequence:
1.files.map(...)
2.files.zip(fileRDDs)
3.files-size.foreach
It's will very time consuming when lots of files.So I do the following correction:
3 loops with files sequence => only one loop

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@@ -27,6 +27,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.util.TimeStampedHashMap
import scala.collection.mutable.ArrayBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does your modification use ArrayBuffer, seems it is not used.

@jerryshao
Copy link
Contributor

Looks good to me about the improvement.

@jerryshao
Copy link
Contributor

Besides would you mind creating a related JIRA and change the title like other PR.

@surq surq changed the title promote the speed of convert files to RDDS [SPARK-3954][Streaming] promote the speed of convert files to RDDS Oct 15, 2014
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@surq
Copy link
Author

surq commented Oct 23, 2014

Does someone take notice of this patch?

@surq
Copy link
Author

surq commented Oct 28, 2014

@jerryshao :Is this a inessential patch? why no manager to merge-commit?

@jerryshao
Copy link
Contributor

Maybe they are quite busy, let me ping @tdas .

@surq
Copy link
Author

surq commented Oct 28, 2014

@tdas
Since this patch was proposed also some day.If you have time, please pay more attention to this patch.
@jerryshao
Thanks for your kind help.

@@ -120,14 +120,14 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas

/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
files.zip(fileRDDs).foreach { case (file, rdd) => {
val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceeds 100 columns.

@rxin
Copy link
Contributor

rxin commented Nov 2, 2014

Does this actually improve performance?

@tdas
Copy link
Contributor

tdas commented Nov 2, 2014

I dont see any reference to file-size.foreach in the patch. which line are you referring to. Were you dealing with so many files that this proved to be a problem? Could post some bench mark numbers on what is the improvement in speed?

@tianyi
Copy link
Contributor

tianyi commented Nov 4, 2014

I think this PR is more concentrate on logical optimization, not speed. Spark used three iterations to get the fileRDD list which is not necessary.

@surq surq changed the title [SPARK-3954][Streaming] promote the speed of convert files to RDDS [SPARK-3954][Streaming] source code optimization Nov 4, 2014
@surq
Copy link
Author

surq commented Nov 4, 2014

@tdas
Through to do the bench mark, actually the spending time difference is very small before and after of the correction.From the code elegant point of view, I think this patch is necessary.So the title is changed. [source code optimization]

@surq
Copy link
Author

surq commented Nov 4, 2014

Bench mark:
10000 files run 10 times.

import org.apache.spark.Logging
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import org.apache.hadoop.mapreduce.{ InputFormat => NewInputFormat }
import org.apache.spark.rdd.UnionRDD
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import scala.sys.process._
import java.io.File
import java.util.Calendar
import java.io.FileWriter

object SPARK_3954Bench {
  // new files's path
  val newFilePath = "/home/hadoop/test/bid/bids"
  // source file
  val sourceCopyFile = "/home/hadoop/test/bid/web415-bid-k1-.1414554678231"
  val resultFile = "/home/hadoop/test/bid/result.txt"
  var newFileList: Seq[String] = _
  // test result print.
  val fw = new FileWriter(resultFile)

  def main(args: Array[String]): Unit = {

    createFiles(sourceCopyFile, newFilePath, 10000)
    newFileList = getNewFileList(newFilePath)
    val ssc = new StreamingContext("local[2]", "BenchWork_test", Seconds(20), System.getenv("SPARK_HOME"))

    //----Choose one of the following test cases to test----------
    0 until 10 foreach benchMarkTest(ssc, "filesToRDD", true)
    // 0 until 10 foreach benchMarkTest(ssc, "filesToRDD_SPARK_3954", true)
    // 0 until 10 foreach benchMarkTest(ssc, "threeLoops", true)
    // 0 until 10 foreach benchMarkTest(ssc, "oneLoop", true)
  }

  /**
   * bench mark test
   * @param ssc: StreamingContext
   */
  def benchMarkTest(ssc: StreamingContext, funName: String, writeFile_flg: Boolean) = (count: Int) => {

    val bt = new BenchTest[LongWritable, Text, TextInputFormat](ssc)
    var streamingStartTime = 0l
    var streamingEndTime = 0l
    funName match {
      case "filesToRDD_SPARK_3954" => {
        streamingStartTime = Calendar.getInstance().getTimeInMillis()
        bt.filesToRDD_SPARK_3954(newFileList)
        streamingEndTime = Calendar.getInstance().getTimeInMillis()
      }
      case "filesToRDD" => {
        streamingStartTime = Calendar.getInstance().getTimeInMillis()
        bt.filesToRDD(newFileList)
        streamingEndTime = Calendar.getInstance().getTimeInMillis()
      }
      case "threeLoops" => {
        streamingStartTime = Calendar.getInstance().getTimeInMillis()
        bt.threeLoops(newFileList)
        streamingEndTime = Calendar.getInstance().getTimeInMillis()
      }
      case "oneLoop" => {
        streamingStartTime = Calendar.getInstance().getTimeInMillis()
        bt.oneLoop(newFileList)
        streamingEndTime = Calendar.getInstance().getTimeInMillis()
      }
    }
    outPint(funName, streamingStartTime, streamingEndTime, writeFile_flg)
  }

  /**
   * @param sourceFile:be coyied file.
   * @param path: new files's path.
   * copy file to another file.
   */
  def createFiles(sourceFile: String, path: String, num: Int) = 0 until num foreach (f => {
    ("cat " + sourceCopyFile) #> new File(path + "/copy" + f)!
  })

  /**
   * @param path: new files's path.
   * get created files list.
   */
  def getNewFileList(path: String) = {
    val path = new File(newFilePath)
    val list = for (file <- path.listFiles) yield (file.getAbsoluteFile().toString())
    list.toSeq
  }

  /**
   * print output result
   */
  def outPint(funName: String, startTime: Long, endTime: Long, writeFile_flg: Boolean) = {
    val contents = "Test function name:[" + funName + "] time consuming:" +
      (endTime - startTime) + "ms (" + startTime + "~" + endTime + ")"
    println(contents)
    if (writeFile_flg) {
      fw.write(contents + System.getProperty("line.separator"))
      fw.flush
    }
  }
}

class BenchTest[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](context: StreamingContext) extends Logging {

  /**
   * patch SPARK-3954
   */
  /** Generate one RDD from an array of files */
  //  def filesToRDD_new(files: Seq[String]): RDD[(K, V)] = {
  def filesToRDD_SPARK_3954(files: Seq[String]) = {
    val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
      if (rdd.partitions.size == 0) {
        logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
          "files that have been \"moved\" to the directory assigned to the file stream. " +
          "Refer to the streaming programming guide for more details.")
      }
      rdd
    }
    new UnionRDD(context.sparkContext, fileRDDs)
  }

  /** Generate one RDD from an array of files */
  //  def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
  def filesToRDD(files: Seq[String]) = {
    val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
    files.zip(fileRDDs).foreach {
      case (file, rdd) => {
        if (rdd.partitions.size == 0) {
          logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
            "files that have been \"moved\" to the directory assigned to the file stream. " +
            "Refer to the streaming programming guide for more details.")
        }
      }
    }
    new UnionRDD(context.sparkContext, fileRDDs)
  }

  /**
   * three recursions Test.
   */
  def threeLoops(files: Seq[String]) = {

    val fileRDDs = files.map(file => file)
    files.zip(fileRDDs).foreach {
      case (file, rdd) => {
        if (rdd.size == 0) {
          logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
            "files that have been \"moved\" to the directory assigned to the file stream. " +
            "Refer to the streaming programming guide for more details.")
        }
      }
    }
  }

  /**
   * only one recursion Test.
   */
  def oneLoop(files: Seq[String]) = {
    val fileRDDs = for (file <- files; rdd = file) yield {
      if (rdd.size == 0) {
        logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
          "files that have been \"moved\" to the directory assigned to the file stream. " +
          "Refer to the streaming programming guide for more details.")
      }
      rdd
    }
  }
}

@surq surq changed the title [SPARK-3954][Streaming] source code optimization [SPARK-3954][Streaming] Optimization to FileInputDStream Nov 5, 2014
@tdas
Copy link
Contributor

tdas commented Nov 7, 2014

Its not a big deal but its a good code update nonetheless. However, i am not sure for...yield is used much in the code base. So a better approach would be to do the following

val fileRDDs = files.map { file => 
  val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file))
  if (rdd.partitions.size == 0) { ... }
  rdd
}

Mind updating the code to use this style?

@tdas
Copy link
Contributor

tdas commented Nov 11, 2014

Jenkins, this is ok to test.

@SparkQA
Copy link

SparkQA commented Nov 11, 2014

Test build #23173 has started for PR 2811 at commit 321bbe8.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 11, 2014

Test build #23173 has finished for PR 2811 at commit 321bbe8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23173/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Nov 11, 2014

Thanks, I have merged this!

@asfgit asfgit closed this in ce6ed2a Nov 11, 2014
asfgit pushed a commit that referenced this pull request Nov 11, 2014
about convert files to RDDS there are 3 loops with files sequence in spark source.
loops files sequence:
1.files.map(...)
2.files.zip(fileRDDs)
3.files-size.foreach
It's will very time consuming when lots of files.So I do the following correction:
3 loops with files sequence => only one loop

Author: surq <[email protected]>

Closes #2811 from surq/SPARK-3954 and squashes the following commits:

321bbe8 [surq]  updated the code style.The style from [for...yield]to [files.map(file=>{})]
88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into SPARK-3954
178066f [surq] modify code's style. [Exceeds 100 columns]
626ef97 [surq] remove redundant import(ArrayBuffer)
739341f [surq] promote the speed of convert files to RDDS

(cherry picked from commit ce6ed2a)
Signed-off-by: Tathagata Das <[email protected]>
asfgit pushed a commit that referenced this pull request Nov 11, 2014
about convert files to RDDS there are 3 loops with files sequence in spark source.
loops files sequence:
1.files.map(...)
2.files.zip(fileRDDs)
3.files-size.foreach
It's will very time consuming when lots of files.So I do the following correction:
3 loops with files sequence => only one loop

Author: surq <[email protected]>

Closes #2811 from surq/SPARK-3954 and squashes the following commits:

321bbe8 [surq]  updated the code style.The style from [for...yield]to [files.map(file=>{})]
88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into SPARK-3954
178066f [surq] modify code's style. [Exceeds 100 columns]
626ef97 [surq] remove redundant import(ArrayBuffer)
739341f [surq] promote the speed of convert files to RDDS

(cherry picked from commit ce6ed2a)
Signed-off-by: Tathagata Das <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants