Skip to content

Commit

Permalink
Update RecoverableNetworkWordCount.scala
Browse files Browse the repository at this point in the history
Ok, I've added ssc.checkpoint(checkpointDirectory) to createContext. First, I wasn't sure that the checkpoin is initiated when the context is recreated from checkpoinDirector. That's why I put it outside createContext.
  • Loading branch information
comcmipi committed Nov 8, 2014
1 parent 96fe274 commit b6d8001
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import org.apache.spark.util.IntParam

object RecoverableNetworkWordCount {

def createContext(ip: String, port: Int, outputPath: String) = {
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {

// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
Expand All @@ -79,6 +79,7 @@ object RecoverableNetworkWordCount {
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(checkpointDirectory)

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
Expand Down Expand Up @@ -114,9 +115,8 @@ object RecoverableNetworkWordCount {
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(ip, port, outputPath)
createContext(ip, port, outputPath, checkpointDirectory)
})
ssc.checkpoint(checkpointDirectory)
ssc.start()
ssc.awaitTermination()
}
Expand Down

0 comments on commit b6d8001

Please sign in to comment.