Skip to content

Commit

Permalink
responded to TD's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
James Thomas authored and James Thomas committed Jun 21, 2016
1 parent 38b5497 commit 18c83b1
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.regex.Pattern;

Expand All @@ -34,7 +35,7 @@
* `$ bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount
* localhost 9999 <checkpoint dir>`
*/
public class JavaStructuredNetworkWordCount {
public final class JavaStructuredNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
Expand All @@ -43,26 +44,40 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

String host = args[0];
int port = Integer.parseInt(args[1]);
String checkpointDir = args[2];

SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();

Dataset<String> df = spark.readStream().format("socket").option("host", args[0])
.option("port", args[1]).load().as(Encoders.STRING());

Dataset<String> words = df.select(functions.explode(functions.split(df.col("value"), " ")).alias("word"))
// input lines (may be multiple words on each line)
Dataset<String> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.load()
.as(Encoders.STRING());

// input words
Dataset<String> words = lines.select(
functions.explode(
functions.split(lines.col("value"), " ")
).alias("word")
).as(Encoders.STRING());

// the count for each distinct word
Dataset<Row> wordCounts = words.groupBy("word").count();

wordCounts.writeStream()
.outputMode(OutputMode.Complete())
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.option("checkpointLocation", args[2])
.start()
.awaitTermination();
.option("checkpointLocation", checkpointDir)
.start();

spark.stop();
query.awaitTermination();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,38 @@
print("Usage: network_wordcount.py <hostname> <port> <checkpoint dir>", file=sys.stderr)
exit(-1)

host = sys.argv[1]
port = int(sys.argv[2])
checkpointDir = sys.argv[3]

spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()

# input lines (may be multiple words on each line)
lines = spark\
.readStream\
.format('socket')\
.option('host', host)\
.option('port', port)\
.load()

df = spark.readStream.format('socket').option('host', sys.argv[1])\
.option('port', sys.argv[2]).load()
# input words
words = lines.select(\
explode(\
split(lines.value, ' ')\
).alias('word')\
)

words = df.select(explode(split(df.value, ' ')).alias('word'))
# the count for each distinct word
wordCounts = words.groupBy('word').count()

wordCounts.writeStream.outputMode('complete').format('console')\
.option('checkpointLocation', sys.argv[3]).start().awaitTermination()
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('checkpointLocation', checkpointDir)\
.start()

spark.stop()
query.awaitTermination()
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,40 @@ object StructuredNetworkWordCount {
System.exit(1)
}

val host = args(0)
val port = args(1).toInt
val checkpointDir = args(2)

val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()

import spark.implicits._

val df = spark.readStream
// input lines (may be multiple words on each line)
val lines = spark.readStream
.format("socket")
.option("host", args(0))
.option("port", args(1))
.option("host", host)
.option("port", port)
.load().as[String]

val words = df.select(functions.explode(functions.split(df.col("value"), " ")).alias("word"))
// input words
val words = lines.select(
functions.explode(
functions.split(lines.col("value"), " ")
).alias("word"))

// the count for each distinct word
val wordCounts = words.groupBy("word").count()

wordCounts.writeStream
.outputMode(OutputMode.Complete())
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", args(2))
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()

spark.stop()
query.awaitTermination()
}
}
// scalastyle:on println

0 comments on commit 18c83b1

Please sign in to comment.