-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12429][Streaming][Doc]Add Accumulator and Broadcast example for Streaming #10385
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily created on demand and tim | |
|
||
*** | ||
|
||
## Accumulators and Broadcast Variables | ||
|
||
[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example. | ||
|
||
<div class="codetabs"> | ||
<div data-lang="scala" markdown="1"> | ||
{% highlight scala %} | ||
|
||
object WordBlacklist { | ||
|
||
@volatile private var instance: Broadcast[Seq[String]] = null | ||
|
||
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { | ||
if (instance == null) { | ||
synchronized { | ||
if (instance == null) { | ||
val wordBlacklist = Seq("a", "b", "c") | ||
instance = sc.broadcast(wordBlacklist) | ||
} | ||
} | ||
} | ||
instance | ||
} | ||
} | ||
|
||
object DroppedWordsCounter { | ||
|
||
@volatile private var instance: Accumulator[Long] = null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For scala, cant this whole thing be replaced with lazy val? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Good point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, no. We need |
||
|
||
def getInstance(sc: SparkContext): Accumulator[Long] = { | ||
if (instance == null) { | ||
synchronized { | ||
if (instance == null) { | ||
instance = sc.accumulator(0L, "WordsInBlacklistCounter") | ||
} | ||
} | ||
} | ||
instance | ||
} | ||
} | ||
|
||
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { | ||
// Get or register the blacklist Broadcast | ||
val blacklist = WordBlacklist.getInstance(rdd.sparkContext) | ||
// Get or register the droppedWordsCounter Accumulator | ||
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) | ||
// Use blacklist to drop words and use droppedWordsCounter to count them | ||
val counts = rdd.filter { case (word, count) => | ||
if (blacklist.value.contains(word)) { | ||
droppedWordsCounter += count | ||
false | ||
} else { | ||
true | ||
} | ||
}.collect() | ||
val output = "Counts at time " + time + " " + counts | ||
}) | ||
|
||
{% endhighlight %} | ||
|
||
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). | ||
</div> | ||
<div data-lang="java" markdown="1"> | ||
{% highlight java %} | ||
|
||
class JavaWordBlacklist { | ||
|
||
private static volatile Broadcast<List<String>> instance = null; | ||
|
||
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) { | ||
if (instance == null) { | ||
synchronized (JavaWordBlacklist.class) { | ||
if (instance == null) { | ||
List<String> wordBlacklist = Arrays.asList("a", "b", "c"); | ||
instance = jsc.broadcast(wordBlacklist); | ||
} | ||
} | ||
} | ||
return instance; | ||
} | ||
} | ||
|
||
class JavaDroppedWordsCounter { | ||
|
||
private static volatile Accumulator<Integer> instance = null; | ||
|
||
public static Accumulator<Integer> getInstance(JavaSparkContext jsc) { | ||
if (instance == null) { | ||
synchronized (JavaDroppedWordsCounter.class) { | ||
if (instance == null) { | ||
instance = jsc.accumulator(0, "WordsInBlacklistCounter"); | ||
} | ||
} | ||
} | ||
return instance; | ||
} | ||
} | ||
|
||
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() { | ||
@Override | ||
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException { | ||
// Get or register the blacklist Broadcast | ||
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); | ||
// Get or register the droppedWordsCounter Accumulator | ||
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); | ||
// Use blacklist to drop words and use droppedWordsCounter to count them | ||
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { | ||
@Override | ||
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception { | ||
if (blacklist.value().contains(wordCount._1())) { | ||
droppedWordsCounter.add(wordCount._2()); | ||
return false; | ||
} else { | ||
return true; | ||
} | ||
} | ||
}).collect().toString(); | ||
String output = "Counts at time " + time + " " + counts; | ||
} | ||
} | ||
|
||
{% endhighlight %} | ||
|
||
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java). | ||
</div> | ||
<div data-lang="python" markdown="1"> | ||
{% highlight python %} | ||
|
||
def getWordBlacklist(sparkContext): | ||
if ('wordBlacklist' not in globals()): | ||
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) | ||
return globals()['wordBlacklist'] | ||
|
||
def getDroppedWordsCounter(sparkContext): | ||
if ('droppedWordsCounter' not in globals()): | ||
globals()['droppedWordsCounter'] = sparkContext.accumulator(0) | ||
return globals()['droppedWordsCounter'] | ||
|
||
def echo(time, rdd): | ||
# Get or register the blacklist Broadcast | ||
blacklist = getWordBlacklist(rdd.context) | ||
# Get or register the droppedWordsCounter Accumulator | ||
droppedWordsCounter = getDroppedWordsCounter(rdd.context) | ||
|
||
# Use blacklist to drop words and use droppedWordsCounter to count them | ||
def filterFunc(wordCount): | ||
if wordCount[0] in blacklist.value: | ||
droppedWordsCounter.add(wordCount[1]) | ||
False | ||
else: | ||
True | ||
|
||
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) | ||
|
||
wordCounts.foreachRDD(echo) | ||
|
||
{% endhighlight %} | ||
|
||
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py). | ||
|
||
</div> | ||
</div> | ||
|
||
*** | ||
|
||
## DataFrame and SQL Operations | ||
You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also fixed the broken
Accumulator
link inprogramming-guide.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!