From 38b5497ef17b0c1f1cf2a8c5731832bed06d2fc8 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Tue, 21 Jun 2016 14:00:05 -0700 Subject: [PATCH 01/10] structured streaming network word count examples --- .../JavaStructuredNetworkWordCount.java | 68 ++++++++++++++++++ .../streaming/structured_network_wordcount.py | 57 +++++++++++++++ .../StructuredNetworkWordCount.scala | 70 +++++++++++++++++++ 3 files changed, 195 insertions(+) create mode 100644 examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java create mode 100644 examples/src/main/python/sql/streaming/structured_network_wordcount.py create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java new file mode 100644 index 0000000000000..edddad70009b1 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql.streaming; + + +import org.apache.spark.sql.*; +import org.apache.spark.sql.streaming.OutputMode; + +import java.util.regex.Pattern; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * + * Usage: JavaStructuredNetworkWordCount + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount + * localhost 9999 ` + */ +public class JavaStructuredNetworkWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) throws Exception { + if (args.length < 3) { + System.err.println("Usage: JavaNetworkWordCount "); + System.exit(1); + } + + SparkSession spark = SparkSession + .builder() + .appName("JavaStructuredNetworkWordCount") + .getOrCreate(); + + Dataset df = spark.readStream().format("socket").option("host", args[0]) + .option("port", args[1]).load().as(Encoders.STRING()); + + Dataset words = df.select(functions.explode(functions.split(df.col("value"), " ")).alias("word")) + .as(Encoders.STRING()); + + Dataset wordCounts = words.groupBy("word").count(); + + wordCounts.writeStream() + .outputMode(OutputMode.Complete()) + .format("console") + .option("checkpointLocation", args[2]) + .start() + .awaitTermination(); + + spark.stop(); + } +} diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py new file mode 100644 index 0000000000000..f773467e05ef3 --- /dev/null +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + Usage: structured_network_wordcount.py + and describe the TCP server that Spark Streaming would connect to receive data. + + To run this on your local machine, you need to first run a Netcat server + `$ nc -lk 9999` + and then run the example + `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py + localhost 9999 ` +""" +from __future__ import print_function + +import sys + +from pyspark.sql import SparkSession +from pyspark.sql.functions import explode +from pyspark.sql.functions import split + +if __name__ == "__main__": + if len(sys.argv) != 4: + print("Usage: network_wordcount.py ", file=sys.stderr) + exit(-1) + + spark = SparkSession\ + .builder\ + .appName("StructuredNetworkWordCount")\ + .getOrCreate() + + + df = spark.readStream.format('socket').option('host', sys.argv[1])\ + .option('port', sys.argv[2]).load() + + words = df.select(explode(split(df.value, ' ')).alias('word')) + wordCounts = words.groupBy('word').count() + + wordCounts.writeStream.outputMode('complete').format('console')\ + .option('checkpointLocation', sys.argv[3]).start().awaitTermination() + + spark.stop() \ No newline at end of file diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala new file mode 100644 index 0000000000000..6873cda596dc7 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.sql.streaming + +import org.apache.spark.sql.{functions, SparkSession} +import org.apache.spark.sql.streaming.OutputMode + + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * + * Usage: StructuredNetworkWordCount + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount + * localhost 9999 ` + */ +object StructuredNetworkWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: StructuredNetworkWordCount ") + System.exit(1) + } + + val spark = SparkSession + .builder + .appName("StructuredNetworkWordCount") + .getOrCreate() + + import spark.implicits._ + + val df = spark.readStream + .format("socket") + .option("host", args(0)) + .option("port", args(1)) + .load().as[String] + + val words = df.select(functions.explode(functions.split(df.col("value"), " ")).alias("word")) + val wordCounts = words.groupBy("word").count() + + wordCounts.writeStream + .outputMode(OutputMode.Complete()) + .format("console") + .option("checkpointLocation", args(2)) + .start() + .awaitTermination() + + spark.stop() + } +} +// scalastyle:on println From 18c83b1550fb69a63dd547e3bd4d030b649c9031 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Tue, 21 Jun 2016 16:38:35 -0700 Subject: [PATCH 02/10] responded to TD's comments --- .../JavaStructuredNetworkWordCount.java | 37 +++++++++++++------ .../streaming/structured_network_wordcount.py | 31 +++++++++++++--- .../StructuredNetworkWordCount.scala | 28 +++++++++----- 3 files changed, 70 insertions(+), 26 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index edddad70009b1..cdae1ed038653 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -19,6 +19,7 @@ import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.streaming.StreamingQuery; import java.util.regex.Pattern; @@ -34,7 +35,7 @@ * `$ bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount * localhost 9999 ` */ -public class JavaStructuredNetworkWordCount { +public final class JavaStructuredNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { @@ -43,26 +44,40 @@ public static void main(String[] args) throws Exception { System.exit(1); } + String host = args[0]; + int port = Integer.parseInt(args[1]); + String checkpointDir = args[2]; + SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate(); - Dataset df = spark.readStream().format("socket").option("host", args[0]) - .option("port", args[1]).load().as(Encoders.STRING()); - - Dataset words = df.select(functions.explode(functions.split(df.col("value"), " ")).alias("word")) + // input lines (may be multiple words on each line) + Dataset lines = spark + .readStream() + .format("socket") + .option("host", host) + .option("port", port) + .load() .as(Encoders.STRING()); + // input words + Dataset words = lines.select( + functions.explode( + functions.split(lines.col("value"), " ") + ).alias("word") + ).as(Encoders.STRING()); + + // the count for each distinct word Dataset wordCounts = words.groupBy("word").count(); - wordCounts.writeStream() - .outputMode(OutputMode.Complete()) + StreamingQuery query = wordCounts.writeStream() + .outputMode("complete") .format("console") - .option("checkpointLocation", args[2]) - .start() - .awaitTermination(); + .option("checkpointLocation", checkpointDir) + .start(); - spark.stop(); + query.awaitTermination(); } } diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py index f773467e05ef3..48959e05cbca4 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py @@ -39,19 +39,38 @@ print("Usage: network_wordcount.py ", file=sys.stderr) exit(-1) + host = sys.argv[1] + port = int(sys.argv[2]) + checkpointDir = sys.argv[3] + spark = SparkSession\ .builder\ .appName("StructuredNetworkWordCount")\ .getOrCreate() + # input lines (may be multiple words on each line) + lines = spark\ + .readStream\ + .format('socket')\ + .option('host', host)\ + .option('port', port)\ + .load() - df = spark.readStream.format('socket').option('host', sys.argv[1])\ - .option('port', sys.argv[2]).load() + # input words + words = lines.select(\ + explode(\ + split(lines.value, ' ')\ + ).alias('word')\ + ) - words = df.select(explode(split(df.value, ' ')).alias('word')) + # the count for each distinct word wordCounts = words.groupBy('word').count() - wordCounts.writeStream.outputMode('complete').format('console')\ - .option('checkpointLocation', sys.argv[3]).start().awaitTermination() + query = wordCounts\ + .writeStream\ + .outputMode('complete')\ + .format('console')\ + .option('checkpointLocation', checkpointDir)\ + .start() - spark.stop() \ No newline at end of file + query.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala index 6873cda596dc7..d2e05db4bd3ba 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -41,6 +41,10 @@ object StructuredNetworkWordCount { System.exit(1) } + val host = args(0) + val port = args(1).toInt + val checkpointDir = args(2) + val spark = SparkSession .builder .appName("StructuredNetworkWordCount") @@ -48,23 +52,29 @@ object StructuredNetworkWordCount { import spark.implicits._ - val df = spark.readStream + // input lines (may be multiple words on each line) + val lines = spark.readStream .format("socket") - .option("host", args(0)) - .option("port", args(1)) + .option("host", host) + .option("port", port) .load().as[String] - val words = df.select(functions.explode(functions.split(df.col("value"), " ")).alias("word")) + // input words + val words = lines.select( + functions.explode( + functions.split(lines.col("value"), " ") + ).alias("word")) + + // the count for each distinct word val wordCounts = words.groupBy("word").count() - wordCounts.writeStream - .outputMode(OutputMode.Complete()) + val query = wordCounts.writeStream + .outputMode("complete") .format("console") - .option("checkpointLocation", args(2)) + .option("checkpointLocation", checkpointDir) .start() - .awaitTermination() - spark.stop() + query.awaitTermination() } } // scalastyle:on println From 46ac930296cce78d47ff832c9940cf4d017224a2 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Wed, 22 Jun 2016 10:23:50 -0700 Subject: [PATCH 03/10] responded to more comments --- .../JavaStructuredNetworkWordCount.java | 14 ++++++------- .../streaming/structured_network_wordcount.py | 7 ++++--- .../StructuredNetworkWordCount.scala | 20 +++++++++---------- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index cdae1ed038653..99b7029b774d5 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -16,9 +16,7 @@ */ package org.apache.spark.examples.sql.streaming; - import org.apache.spark.sql.*; -import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; import java.util.regex.Pattern; @@ -53,25 +51,25 @@ public static void main(String[] args) throws Exception { .appName("JavaStructuredNetworkWordCount") .getOrCreate(); - // input lines (may be multiple words on each line) - Dataset lines = spark + // Create DataFrame representing the stream of input lines from connection to host:port + Dataset lines = spark .readStream() .format("socket") .option("host", host) .option("port", port) - .load() - .as(Encoders.STRING()); + .load(); - // input words + // Split the lines into words Dataset words = lines.select( functions.explode( functions.split(lines.col("value"), " ") ).alias("word") ).as(Encoders.STRING()); - // the count for each distinct word + // Generate running word count Dataset wordCounts = words.groupBy("word").count(); + // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py index 48959e05cbca4..7e2643c017e69 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py @@ -48,7 +48,7 @@ .appName("StructuredNetworkWordCount")\ .getOrCreate() - # input lines (may be multiple words on each line) + # Create DataFrame representing the stream of input lines from connection to host:port lines = spark\ .readStream\ .format('socket')\ @@ -56,16 +56,17 @@ .option('port', port)\ .load() - # input words + # Split the lines into words words = lines.select(\ explode(\ split(lines.value, ' ')\ ).alias('word')\ ) - # the count for each distinct word + # Generate running word count wordCounts = words.groupBy('word').count() + # Start running the query that prints the running counts to the console query = wordCounts\ .writeStream\ .outputMode('complete')\ diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala index d2e05db4bd3ba..b63010a324d9e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -18,9 +18,8 @@ // scalastyle:off println package org.apache.spark.examples.sql.streaming -import org.apache.spark.sql.{functions, SparkSession} -import org.apache.spark.sql.streaming.OutputMode - +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. @@ -50,24 +49,23 @@ object StructuredNetworkWordCount { .appName("StructuredNetworkWordCount") .getOrCreate() - import spark.implicits._ - - // input lines (may be multiple words on each line) + // Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) - .load().as[String] + .load() - // input words + // Split the lines into words val words = lines.select( - functions.explode( - functions.split(lines.col("value"), " ") + explode( + split(lines.col("value"), " ") ).alias("word")) - // the count for each distinct word + // Generate running word count val wordCounts = words.groupBy("word").count() + // Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") From 80fee206beccc38b7b1e92e47a20ec2525e02b31 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Wed, 22 Jun 2016 10:48:29 -0700 Subject: [PATCH 04/10] fixed python lint --- .../python/sql/streaming/structured_network_wordcount.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py index 7e2643c017e69..833bd76028901 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py @@ -57,10 +57,10 @@ .load() # Split the lines into words - words = lines.select(\ - explode(\ - split(lines.value, ' ')\ - ).alias('word')\ + words = lines.select( + explode( + split(lines.value, ' ') + ).alias('word') ) # Generate running word count From f7aec9d1256790070dc8122b8e70c8855f2de8f4 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Wed, 22 Jun 2016 15:07:47 -0700 Subject: [PATCH 05/10] small fixes --- .../streaming/JavaStructuredNetworkWordCount.java | 13 +++++-------- .../sql/streaming/StructuredNetworkWordCount.scala | 3 ++- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index 99b7029b774d5..5a75189a8513e 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -19,8 +19,6 @@ import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; -import java.util.regex.Pattern; - /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * @@ -34,7 +32,6 @@ * localhost 9999 ` */ public final class JavaStructuredNetworkWordCount { - private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { if (args.length < 3) { @@ -60,11 +57,11 @@ public static void main(String[] args) throws Exception { .load(); // Split the lines into words - Dataset words = lines.select( - functions.explode( - functions.split(lines.col("value"), " ") - ).alias("word") - ).as(Encoders.STRING()); + Dataset words = lines.select( + functions.explode( + functions.split(lines.col("value"), " ") + ).alias("word") + ); // Generate running word count Dataset wordCounts = words.groupBy("word").count(); diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala index b63010a324d9e..96fa5cd1c8c8c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -60,7 +60,8 @@ object StructuredNetworkWordCount { val words = lines.select( explode( split(lines.col("value"), " ") - ).alias("word")) + ).alias("word") + ) // Generate running word count val wordCounts = words.groupBy("word").count() From c3b16a28e91328c5f6e6fb8efd5e3ee9b3cde1ed Mon Sep 17 00:00:00 2001 From: James Thomas Date: Mon, 27 Jun 2016 10:45:13 -0700 Subject: [PATCH 06/10] New example --- .../streaming/NetworkEventTimeWindow.scala | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/streaming/NetworkEventTimeWindow.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/NetworkEventTimeWindow.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/NetworkEventTimeWindow.scala new file mode 100644 index 0000000000000..6da67dd08050c --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/NetworkEventTimeWindow.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.sql.streaming + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.TimestampType + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * + * Usage: EventTimeWindowExample + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example org.apache.spark.examples.sql.streaming.EventTimeWindowExample + * localhost 9999 ` + */ +object NetworkEventTimeWindow { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: EventTimeWindowExample ") + System.exit(1) + } + + val host = args(0) + val port = args(1).toInt + val checkpointDir = args(2) + + val spark = SparkSession + .builder + .appName("EventTimeWindowExample") + .getOrCreate() + + import spark.implicits._ + + // Create DataFrame representing the stream of input lines from connection to host:port + val lines = spark.readStream + .format("socket") + .option("host", host) + .option("port", port) + .load().as[String] + + val formatted = lines.map(l => { + val els = l.split(",") + (els(0).trim, els(1).trim.toDouble, els(2).trim) + }) + + val formattedRenamed = formatted.select(formatted.col("_1").alias("device"), + formatted.col("_2").alias("signal"), formatted.col("_3").cast(TimestampType).as("time")) + + val windowedAvgs = formattedRenamed.groupBy( + window(formattedRenamed.col("time"), "1 minute")).avg("signal") + + // Start running the query that prints the running averages to the console + val query = windowedAvgs.writeStream + .outputMode("complete") + .format("console") + .option("checkpointLocation", checkpointDir) + .start() + + query.awaitTermination() + } +} +// scalastyle:on println From fb491c6237380d6419c63c7b24b73c574bee843d Mon Sep 17 00:00:00 2001 From: James Thomas Date: Tue, 28 Jun 2016 09:52:42 -0700 Subject: [PATCH 07/10] addressed comments --- .../JavaStructuredNetworkWordCount.java | 5 +- .../streaming/structured_network_wordcount.py | 3 +- .../sql/streaming/EventTimeWindow.scala | 114 ++++++++++++++++++ .../streaming/NetworkEventTimeWindow.scala | 84 ------------- .../StructuredNetworkWordCount.scala | 5 +- 5 files changed, 122 insertions(+), 89 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/streaming/EventTimeWindow.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/streaming/NetworkEventTimeWindow.scala diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index 5a75189a8513e..727dfaea810d3 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -23,12 +23,13 @@ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: JavaStructuredNetworkWordCount - * and describe the TCP server that Spark Streaming would connect to receive data. + * and describe the TCP server that Structured Streaming + * would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount + * `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCount * localhost 9999 ` */ public final class JavaStructuredNetworkWordCount { diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py index 833bd76028901..8275a3146e942 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py @@ -18,7 +18,8 @@ """ Counts words in UTF8 encoded, '\n' delimited text received from the network every second. Usage: structured_network_wordcount.py - and describe the TCP server that Spark Streaming would connect to receive data. + and describe the TCP server that Structured Streaming + would connect to receive data. To run this on your local machine, you need to first run a Netcat server `$ nc -lk 9999` diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/EventTimeWindow.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/EventTimeWindow.scala new file mode 100644 index 0000000000000..a6043bf763b01 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/EventTimeWindow.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.sql.streaming + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DoubleType, TimestampType} + +/** + * Computes the average signal from IoT device readings over a sliding window of + * configurable duration. The readings are received over the network and must be + * UTF8-encoded and separated by '\n'. + * + * A single reading should take the format + * , ,