From 9928ca52457d8c951727461175ce37757cf1bb05 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 18 Dec 2015 11:50:47 -0800 Subject: [PATCH 1/4] Add Accumulator and Broadcast Scala example for Streaming --- docs/streaming-programming-guide.md | 89 +++++++++++++++++++ .../RecoverableNetworkWordCount.scala | 66 ++++++++++++-- 2 files changed, 150 insertions(+), 5 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index ed6b28c282135..6c6a2231cef4c 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1415,6 +1415,95 @@ Note that the connections in the pool should be lazily created on demand and tim *** +## Accumulator and Broadcast + +Accumulator and Broadcast cannot be recovered from checkpoint in Streaming. If you enable checkpoint and use Accumulator or Broadcast as well, you have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example. + +
+
+{% highlight scala %} + +object WordBlacklist { + + @volatile private var instance: Broadcast[Seq[String]] = null + + def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { + if (instance == null) { + synchronized { + if (instance == null) { + val wordBlacklist = Seq("a", "b", "c") + instance = sc.broadcast(wordBlacklist) + } + } + } + instance + } +} + +object DroppedWordsCounter { + + @volatile private var instance: Accumulator[Long] = null + + def getInstance(sc: SparkContext): Accumulator[Long] = { + if (instance == null) { + synchronized { + if (instance == null) { + instance = sc.accumulator(0L, "WordsInBlacklistCounter") + } + } + } + instance + } +} + +wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { + // Get or register the blacklist Broadcast + val blacklist = WordBlacklist.getInstance(rdd.sparkContext) + // Get or register the droppedWordsCounter Accumulator + val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) + // Use blacklist to drop words and use droppedWordsCounter to count them + val counts = rdd.filter { case (word, count) => + if (blacklist.value.contains(word)) { + droppedWordsCounter += 1 + false + } else { + true + } + }.collect().mkString("[", ", ", "]") + val output = "Counts at time " + time + " " + counts + println(output) + println("Dropped " + droppedWordsCounter.value + " word(s) totally") + println("Appending to " + outputFile.getAbsolutePath) + Files.append(output + "\n", outputFile, Charset.defaultCharset()) +}) + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala). +
+
+{% highlight java %} + +TODO + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java). +
+
+{% highlight python %} + +TODO + +{% endhighlight %} + +See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py). + +
+
+ +*** + ## DataFrame and SQL Operations You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 9916882e4f94a..cbcd908fed78f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -23,13 +23,55 @@ import java.nio.charset.Charset import com.google.common.io.Files -import org.apache.spark.SparkConf +import org.apache.spark.{Accumulator, SparkConf, SparkContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import org.apache.spark.util.IntParam /** - * Counts words in text encoded with UTF8 received from the network every second. + * Use this singleton to get or register `Broadcast`. + */ +object WordBlacklist { + + @volatile private var instance: Broadcast[Seq[String]] = null + + def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { + if (instance == null) { + synchronized { + if (instance == null) { + val wordBlacklist = Seq("a", "b", "c") + instance = sc.broadcast(wordBlacklist) + } + } + } + instance + } +} + +/** + * Use this singleton to get or register `Accumulator`. + */ +object DroppedWordsCounter { + + @volatile private var instance: Accumulator[Long] = null + + def getInstance(sc: SparkContext): Accumulator[Long] = { + if (instance == null) { + synchronized { + if (instance == null) { + instance = sc.accumulator(0L, "WordsInBlacklistCounter") + } + } + } + instance + } +} + +/** + * Counts words in text encoded with UTF8 received from the network every second. This example also + * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that + * they can be registered on driver failures. * * Usage: RecoverableNetworkWordCount * and describe the TCP server that Spark Streaming would connect to receive @@ -75,10 +117,24 @@ object RecoverableNetworkWordCount { val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { - val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]") - println(counts) + // Get or register the blacklist Broadcast + val blacklist = WordBlacklist.getInstance(rdd.sparkContext) + // Get or register the droppedWordsCounter Accumulator + val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) + // Use blacklist to drop words and use droppedWordsCounter to count them + val counts = rdd.filter { case (word, count) => + if (blacklist.value.contains(word)) { + droppedWordsCounter += 1 + false + } else { + true + } + }.collect().mkString("[", ", ", "]") + val output = "Counts at time " + time + " " + counts + println(output) + println("Dropped " + droppedWordsCounter.value + " word(s) totally") println("Appending to " + outputFile.getAbsolutePath) - Files.append(counts + "\n", outputFile, Charset.defaultCharset()) + Files.append(output + "\n", outputFile, Charset.defaultCharset()) }) ssc } From 78d15bd7e343c4d6d07dcf97df76a6a8e6e8fe61 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 18 Dec 2015 15:31:00 -0800 Subject: [PATCH 2/4] Address --- docs/streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 6c6a2231cef4c..68674d538754b 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1417,7 +1417,7 @@ Note that the connections in the pool should be lazily created on demand and tim ## Accumulator and Broadcast -Accumulator and Broadcast cannot be recovered from checkpoint in Streaming. If you enable checkpoint and use Accumulator or Broadcast as well, you have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example. +Accumulator and Broadcast cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulator or Broadcast as well, you'll have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
From 9e241e791e5ce2cb0cc3dd7c609339ca8ea11129 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 21 Dec 2015 10:40:19 -0800 Subject: [PATCH 3/4] Add Java and Python examples --- docs/streaming-programming-guide.md | 94 ++++++++++++++++++- .../JavaRecoverableNetworkWordCount.java | 71 +++++++++++++- .../recoverable_network_wordcount.py | 30 +++++- .../RecoverableNetworkWordCount.scala | 2 +- 4 files changed, 189 insertions(+), 8 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 68674d538754b..afe29c30a0363 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1484,7 +1484,66 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
{% highlight java %} -TODO +class JavaWordBlacklist { + + private static volatile Broadcast> instance = null; + + public static Broadcast> getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (WordBlacklist.class) { + if (instance == null) { + List wordBlacklist = Arrays.asList("a", "b", "c"); + instance = jsc.broadcast(wordBlacklist); + } + } + } + return instance; + } +} + +class JavaDroppedWordsCounter { + + private static volatile Accumulator instance = null; + + public static Accumulator getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (DroppedWordsCounter.class) { + if (instance == null) { + instance = jsc.accumulator(0, "WordsInBlacklistCounter"); + } + } + } + return instance; + } +} + +wordCounts.foreachRDD(new Function2, Time, Void>() { + @Override + public Void call(JavaPairRDD rdd, Time time) throws IOException { + // Get or register the blacklist Broadcast + final Broadcast> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + final Accumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(new Function, Boolean>() { + @Override + public Boolean call(Tuple2 wordCount) throws Exception { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; + } + } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; + System.out.println(output); + System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); + System.out.println("Appending to " + outputFile.getAbsolutePath()); + Files.append(output + "\n", outputFile, Charset.defaultCharset()); + return null; + } +} {% endhighlight %} @@ -1493,7 +1552,38 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
{% highlight python %} -TODO +def getWordBlacklist(sparkContext): + if ('wordBlacklist' not in globals()): + globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) + return globals()['wordBlacklist'] + +def getDroppedWordsCounter(sparkContext): + if ('droppedWordsCounter' not in globals()): + globals()['droppedWordsCounter'] = sparkContext.accumulator(0) + return globals()['droppedWordsCounter'] + +def echo(time, rdd): + # Get or register the blacklist Broadcast + blacklist = getWordBlacklist(rdd.context) + # Get or register the droppedWordsCounter Accumulator + droppedWordsCounter = getDroppedWordsCounter(rdd.context) + + # Use blacklist to drop words and use droppedWordsCounter to count them + def filterFunc(wordCount): + if wordCount[0] in blacklist.value: + droppedWordsCounter.add(wordCount[1]) + False + else: + True + + counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) + print(counts) + print("Dropped %d word(s) totally" % droppedWordsCounter.value) + print("Appending to " + os.path.abspath(outputPath)) + with open(outputPath, 'a') as f: + f.write(counts + "\n") + +wordCounts.foreachRDD(echo) {% endhighlight %} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index bceda97f058ea..5922de87dd9e5 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -21,17 +21,22 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.List; import java.util.regex.Pattern; import scala.Tuple2; import com.google.common.collect.Lists; import com.google.common.io.Files; +import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; @@ -41,7 +46,48 @@ import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; /** - * Counts words in text encoded with UTF8 received from the network every second. + * Use this singleton to get or register `Broadcast`. + */ +class JavaWordBlacklist { + + private static volatile Broadcast> instance = null; + + public static Broadcast> getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (WordBlacklist.class) { + if (instance == null) { + List wordBlacklist = Arrays.asList("a", "b", "c"); + instance = jsc.broadcast(wordBlacklist); + } + } + } + return instance; + } +} + +/** + * Use this singleton to get or register `Accumulator`. + */ +class JavaDroppedWordsCounter { + + private static volatile Accumulator instance = null; + + public static Accumulator getInstance(JavaSparkContext jsc) { + if (instance == null) { + synchronized (DroppedWordsCounter.class) { + if (instance == null) { + instance = jsc.accumulator(0, "WordsInBlacklistCounter"); + } + } + } + return instance; + } +} + +/** + * Counts words in text encoded with UTF8 received from the network every second. This example also + * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that + * they can be registered on driver failures. * * Usage: JavaRecoverableNetworkWordCount * and describe the TCP server that Spark Streaming would connect to receive @@ -111,10 +157,27 @@ public Integer call(Integer i1, Integer i2) { wordCounts.foreachRDD(new Function2, Time, Void>() { @Override public Void call(JavaPairRDD rdd, Time time) throws IOException { - String counts = "Counts at time " + time + " " + rdd.collect(); - System.out.println(counts); + // Get or register the blacklist Broadcast + final Broadcast> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + final Accumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(new Function, Boolean>() { + @Override + public Boolean call(Tuple2 wordCount) throws Exception { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; + } + } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; + System.out.println(output); + System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); System.out.println("Appending to " + outputFile.getAbsolutePath()); - Files.append(counts + "\n", outputFile, Charset.defaultCharset()); + Files.append(output + "\n", outputFile, Charset.defaultCharset()); return null; } }); diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py index ac91f0a06b172..788cf614a8642 100644 --- a/examples/src/main/python/streaming/recoverable_network_wordcount.py +++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py @@ -44,6 +44,20 @@ from pyspark.streaming import StreamingContext +# Get or register `Broadcast` +def getWordBlacklist(sparkContext): + if ('wordBlacklist' not in globals()): + globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) + return globals()['wordBlacklist'] + + +# Get or register `Accumulator` +def getDroppedWordsCounter(sparkContext): + if ('droppedWordsCounter' not in globals()): + globals()['droppedWordsCounter'] = sparkContext.accumulator(0) + return globals()['droppedWordsCounter'] + + def createContext(host, port, outputPath): # If you do not see this printed, that means the StreamingContext has been loaded # from the new checkpoint @@ -60,8 +74,22 @@ def createContext(host, port, outputPath): wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) def echo(time, rdd): - counts = "Counts at time %s %s" % (time, rdd.collect()) + # Get or register the blacklist Broadcast + blacklist = getWordBlacklist(rdd.context) + # Get or register the droppedWordsCounter Accumulator + droppedWordsCounter = getDroppedWordsCounter(rdd.context) + + # Use blacklist to drop words and use droppedWordsCounter to count them + def filterFunc(wordCount): + if wordCount[0] in blacklist.value: + droppedWordsCounter.add(wordCount[1]) + False + else: + True + + counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) print(counts) + print("Dropped %d word(s) totally" % droppedWordsCounter.value) print("Appending to " + os.path.abspath(outputPath)) with open(outputPath, 'a') as f: f.write(counts + "\n") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index cbcd908fed78f..6cdbe1ae7b184 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -124,7 +124,7 @@ object RecoverableNetworkWordCount { // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { - droppedWordsCounter += 1 + droppedWordsCounter += count false } else { true From 455968a35d22978b9f6e85e34687615f40734708 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 22 Dec 2015 15:55:44 -0800 Subject: [PATCH 4/4] Address comments --- docs/programming-guide.md | 6 ++--- docs/streaming-programming-guide.md | 26 +++++-------------- .../JavaRecoverableNetworkWordCount.java | 8 +++--- .../recoverable_network_wordcount.py | 4 +-- .../RecoverableNetworkWordCount.scala | 4 +-- 5 files changed, 17 insertions(+), 31 deletions(-) diff --git a/docs/programming-guide.md b/docs/programming-guide.md index f823b89a4b5e9..010346e1e81d0 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure. -To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail. +To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail. In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed. @@ -1091,7 +1091,7 @@ for details. foreach(func) - Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. + Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. @@ -1336,7 +1336,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later). -## Accumulators +## Accumulators Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index afe29c30a0363..3b071c7da5596 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1415,9 +1415,9 @@ Note that the connections in the pool should be lazily created on demand and tim *** -## Accumulator and Broadcast +## Accumulators and Broadcast Variables -Accumulator and Broadcast cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulator or Broadcast as well, you'll have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example. +[Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use [Accumulators](programming-guide.html#accumulators) or [Broadcast variables](programming-guide.html#broadcast-variables) as well, you'll have to create lazily instantiated singleton instances for [Accumulators](programming-guide.html#accumulators) and [Broadcast variables](programming-guide.html#broadcast-variables) so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.
@@ -1464,17 +1464,13 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { - droppedWordsCounter += 1 + droppedWordsCounter += count false } else { true } - }.collect().mkString("[", ", ", "]") + }.collect() val output = "Counts at time " + time + " " + counts - println(output) - println("Dropped " + droppedWordsCounter.value + " word(s) totally") - println("Appending to " + outputFile.getAbsolutePath) - Files.append(output + "\n", outputFile, Charset.defaultCharset()) }) {% endhighlight %} @@ -1490,7 +1486,7 @@ class JavaWordBlacklist { public static Broadcast> getInstance(JavaSparkContext jsc) { if (instance == null) { - synchronized (WordBlacklist.class) { + synchronized (JavaWordBlacklist.class) { if (instance == null) { List wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); @@ -1507,7 +1503,7 @@ class JavaDroppedWordsCounter { public static Accumulator getInstance(JavaSparkContext jsc) { if (instance == null) { - synchronized (DroppedWordsCounter.class) { + synchronized (JavaDroppedWordsCounter.class) { if (instance == null) { instance = jsc.accumulator(0, "WordsInBlacklistCounter"); } @@ -1537,11 +1533,6 @@ wordCounts.foreachRDD(new Function2, Time, Void>() } }).collect().toString(); String output = "Counts at time " + time + " " + counts; - System.out.println(output); - System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); - System.out.println("Appending to " + outputFile.getAbsolutePath()); - Files.append(output + "\n", outputFile, Charset.defaultCharset()); - return null; } } @@ -1577,11 +1568,6 @@ def echo(time, rdd): True counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) - print(counts) - print("Dropped %d word(s) totally" % droppedWordsCounter.value) - print("Appending to " + os.path.abspath(outputPath)) - with open(outputPath, 'a') as f: - f.write(counts + "\n") wordCounts.foreachRDD(echo) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index 5922de87dd9e5..90d473703ec5a 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -46,7 +46,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; /** - * Use this singleton to get or register `Broadcast`. + * Use this singleton to get or register a Broadcast variable. */ class JavaWordBlacklist { @@ -54,7 +54,7 @@ class JavaWordBlacklist { public static Broadcast> getInstance(JavaSparkContext jsc) { if (instance == null) { - synchronized (WordBlacklist.class) { + synchronized (JavaWordBlacklist.class) { if (instance == null) { List wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); @@ -66,7 +66,7 @@ public static Broadcast> getInstance(JavaSparkContext jsc) { } /** - * Use this singleton to get or register `Accumulator`. + * Use this singleton to get or register an Accumulator. */ class JavaDroppedWordsCounter { @@ -74,7 +74,7 @@ class JavaDroppedWordsCounter { public static Accumulator getInstance(JavaSparkContext jsc) { if (instance == null) { - synchronized (DroppedWordsCounter.class) { + synchronized (JavaDroppedWordsCounter.class) { if (instance == null) { instance = jsc.accumulator(0, "WordsInBlacklistCounter"); } diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py index 788cf614a8642..52b2639cdf55c 100644 --- a/examples/src/main/python/streaming/recoverable_network_wordcount.py +++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py @@ -44,14 +44,14 @@ from pyspark.streaming import StreamingContext -# Get or register `Broadcast` +# Get or register a Broadcast variable def getWordBlacklist(sparkContext): if ('wordBlacklist' not in globals()): globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) return globals()['wordBlacklist'] -# Get or register `Accumulator` +# Get or register an Accumulator def getDroppedWordsCounter(sparkContext): if ('droppedWordsCounter' not in globals()): globals()['droppedWordsCounter'] = sparkContext.accumulator(0) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 6cdbe1ae7b184..38d4fd11f97d1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -30,7 +30,7 @@ import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import org.apache.spark.util.IntParam /** - * Use this singleton to get or register `Broadcast`. + * Use this singleton to get or register a Broadcast variable. */ object WordBlacklist { @@ -50,7 +50,7 @@ object WordBlacklist { } /** - * Use this singleton to get or register `Accumulator`. + * Use this singleton to get or register an Accumulator. */ object DroppedWordsCounter {