Skip to content

Commit

Permalink
structured streaming network word count examples
Browse files Browse the repository at this point in the history
  • Loading branch information
James Thomas authored and James Thomas committed Jun 21, 2016
1 parent dbfdae4 commit 38b5497
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.sql.streaming;


import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;

import java.util.regex.Pattern;

/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: JavaStructuredNetworkWordCount <hostname> <port> <checkpoint dir>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount
* localhost 9999 <checkpoint dir>`
*/
public class JavaStructuredNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: JavaNetworkWordCount <hostname> <port> <checkpoint dir>");
System.exit(1);
}

SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();

Dataset<String> df = spark.readStream().format("socket").option("host", args[0])
.option("port", args[1]).load().as(Encoders.STRING());

Dataset<String> words = df.select(functions.explode(functions.split(df.col("value"), " ")).alias("word"))
.as(Encoders.STRING());

Dataset<Row> wordCounts = words.groupBy("word").count();

wordCounts.writeStream()
.outputMode(OutputMode.Complete())
.format("console")
.option("checkpointLocation", args[2])
.start()
.awaitTermination();

spark.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: structured_network_wordcount.py <hostname> <port> <checkpoint dir>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py
localhost 9999 <checkpoint dir>`
"""
from __future__ import print_function

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: network_wordcount.py <hostname> <port> <checkpoint dir>", file=sys.stderr)
exit(-1)

spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()


df = spark.readStream.format('socket').option('host', sys.argv[1])\
.option('port', sys.argv[2]).load()

words = df.select(explode(split(df.value, ' ')).alias('word'))
wordCounts = words.groupBy('word').count()

wordCounts.writeStream.outputMode('complete').format('console')\
.option('checkpointLocation', sys.argv[3]).start().awaitTermination()

spark.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples.sql.streaming

import org.apache.spark.sql.{functions, SparkSession}
import org.apache.spark.sql.streaming.OutputMode


/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: StructuredNetworkWordCount <hostname> <port> <checkpoint dir>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount
* localhost 9999 <checkpoint dir>`
*/
object StructuredNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: StructuredNetworkWordCount <hostname> <port> <checkpoint dir>")
System.exit(1)
}

val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()

import spark.implicits._

val df = spark.readStream
.format("socket")
.option("host", args(0))
.option("port", args(1))
.load().as[String]

val words = df.select(functions.explode(functions.split(df.col("value"), " ")).alias("word"))
val wordCounts = words.groupBy("word").count()

wordCounts.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.option("checkpointLocation", args(2))
.start()
.awaitTermination()

spark.stop()
}
}
// scalastyle:on println

0 comments on commit 38b5497

Please sign in to comment.