Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12429][Streaming][Doc]Add Accumulator and Broadcast example for Streaming #10385

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,95 @@ Note that the connections in the pool should be lazily created on demand and tim

***

## Accumulator and Broadcast
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accumulators and Broadcast variables


Accumulator and Broadcast cannot be recovered from checkpoint in Streaming. If you enable checkpoint and use Accumulator or Broadcast as well, you have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say: "in Spark Streaming. If you enable checkpointing and use an Accumulator or Broadcast as well, you**'ll** have to create ..."


<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

object WordBlacklist {

@volatile private var instance: Broadcast[Seq[String]] = null

def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}

object DroppedWordsCounter {

@volatile private var instance: Accumulator[Long] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For scala, cant this whole thing be replaced with lazy val?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Good point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, no. We need sc here. So it cannot be a val.


def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}
}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += 1
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
println(output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to give so much code. remove all the unnecessary cosmetic stuff like "mkString" and "print" and Files.append ....

Same for all the other languages.

println("Dropped " + droppedWordsCounter.value + " word(s) totally")
println("Appending to " + outputFile.getAbsolutePath)
Files.append(output + "\n", outputFile, Charset.defaultCharset())
})

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
</div>
<div data-lang="java" markdown="1">
{% highlight java %}

TODO

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

TODO

{% endhighlight %}

See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).

</div>
</div>

***

## DataFrame and SQL Operations
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,55 @@ import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam

/**
* Counts words in text encoded with UTF8 received from the network every second.
* Use this singleton to get or register `Broadcast`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as above.

*/
object WordBlacklist {

@volatile private var instance: Broadcast[Seq[String]] = null

def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}

/**
* Use this singleton to get or register `Accumulator`.
*/
object DroppedWordsCounter {

@volatile private var instance: Accumulator[Long] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above.


def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}
}

/**
* Counts words in text encoded with UTF8 received from the network every second. This example also
* shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
* they can be registered on driver failures.
*
* Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
Expand Down Expand Up @@ -75,10 +117,24 @@ object RecoverableNetworkWordCount {
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
println(counts)
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += 1
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
println(output)
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
println("Appending to " + outputFile.getAbsolutePath)
Files.append(counts + "\n", outputFile, Charset.defaultCharset())
Files.append(output + "\n", outputFile, Charset.defaultCharset())
})
ssc
}
Expand Down