From 742ada91e0415726b865029b1c9e9e1d6ab2ecb8 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:35:29 +0530 Subject: [PATCH 01/13] mqttinputdstream for mqttstreaming adapter --- .../streaming/dstream/MQTTInputDStream.scala | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala new file mode 100644 index 0000000000000..3416989c02f2f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } + +import java.util.Properties +import java.util.concurrent.Executors +import java.io.IOException; + +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttTopic; + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + +private[streaming] class MQTTInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + + storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel) extends NetworkReceiver[Any] { + + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + //Set up persistence for messages + var peristance:MqttClientPersistence =new MemoryPersistence(); + + //Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance); + + //Connect to MqttBroker + client.connect(); + + //Subscribe to Mqtt topic + client.subscribe(topic); + + //Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + //Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + System.err.println("Connection lost " + arg0) + + } + + }; + + //Set up callback for MqttClient + client.setCallback(callback); + + } + +} From 2e48b23eae6cfc5e7c825573e2739e54c4569045 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:36:25 +0530 Subject: [PATCH 02/13] added mqtt adapter --- .../apache/spark/streaming/StreamingContext.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 878725c705db7..8ed5dfb99b73c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -450,6 +450,21 @@ class StreamingContext private ( inputStream } +/** + * Create an input stream that receives messages pushed by a mqtt publisher. + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { + val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) + registerInputStream(inputStream) + inputStream + } /** * Create a unified DStream from multiple DStreams of the same type and same interval */ From 06de3d516d5080a55283192ff4b076ad3f8fc668 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:38:37 +0530 Subject: [PATCH 03/13] added mqtt adapter library dependencies --- project/SparkBuild.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d038a4f479682..87a90d97b6253 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -94,7 +94,10 @@ object SparkBuild extends Build { // Shared between both core and streaming. resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), - // For Sonatype publishing + // Shared between both examples and streaming. + resolvers ++= Seq("Mqtt Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"), + + // For Sonatype publishing resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), @@ -228,6 +231,8 @@ object SparkBuild extends Build { "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), + "org.eclipse.paho" % "mqtt-client" % "0.4.0", + "org.apache.cassandra" % "cassandra-all" % "1.2.5" exclude("com.google.guava", "guava") exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") @@ -261,6 +266,7 @@ object SparkBuild extends Build { "Akka Repository" at "http://repo.akka.io/releases/" ), libraryDependencies ++= Seq( + "org.eclipse.paho" % "mqtt-client" % "0.4.0", "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), From 9eaf68fd4032eaa8e8e8930c14fae2fad3d17d14 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:40:38 +0530 Subject: [PATCH 04/13] added mqtt adapter wordcount example --- .../streaming/examples/MQTTWordCount.scala | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala new file mode 100644 index 0000000000000..04e21bef5eb29 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples + +import org.apache.spark.streaming.{ Seconds, StreamingContext } +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.MQTTReceiver +import org.apache.spark.storage.StorageLevel + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttTopic; + +/** + * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming" + */ + +object MQTTPublisher { + + var client: MqttClient = _ + + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: MQTTPublisher ") + System.exit(1) + } + + val Seq(brokerUrl, topic) = args.toSeq + + try { + var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp"); + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance); + } catch { + case e: MqttException => println("Exception Caught: " + e); + + } + + client.connect(); + + val msgtopic: MqttTopic = client.getTopic(topic); + val msg: String = "hello mqtt demo for spark streaming"; + + while (true) { + val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()); + msgtopic.publish(message); + println("Published data. topic: " + msgtopic.getName() + " Message: " + message); + } + client.disconnect(); + + } +} + +/** + * A sample wordcount with MqttStream stream + * + * To work with Mqtt, Mqtt Message broker/server required. + * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker + * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` + * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ + * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient + * Usage: MQTTWordCount + * In local mode, should be 'local[n]' with n > 1 + * and describe where Mqtt publisher is running. + * + * To run this example locally, you may run publisher as + * `$ ./run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo` + * and run the example as + * `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo` + */ + +object MQTTWordCount { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: MQTTWordCount " + + " In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + val Seq(master, brokerUrl, topic) = args.toSeq + + val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) + + val words = lines.flatMap(x => x.toString.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + + ssc.start() + } +} + From 7d36a117c1d3a37d73f474d0074e57ee0b781205 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:41:26 +0530 Subject: [PATCH 05/13] add maven dependencies for mqtt --- examples/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/examples/pom.xml b/examples/pom.xml index 224cf6c96c9cc..afce8493cdfb6 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -125,6 +125,11 @@ + + org.eclipse.paho + mqtt-client + 0.4.0 + From 9a7575728d41485407821ea450d52c3a51687de5 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:41:49 +0530 Subject: [PATCH 06/13] add maven dependencies for mqtt --- streaming/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/streaming/pom.xml b/streaming/pom.xml index 7bea069b6158a..c793f207c6d30 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -109,6 +109,11 @@ slf4j-log4j12 test + + org.eclipse.paho + mqtt-client + 0.4.0 + target/scala-${scala.version}/classes From 29245605bf7a17e5ad98f9fae7ec402f35616142 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Thu, 17 Oct 2013 09:57:30 +0530 Subject: [PATCH 07/13] remove unused dependency --- project/SparkBuild.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 87a90d97b6253..7bcc65676e6d3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -231,8 +231,6 @@ object SparkBuild extends Build { "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), - "org.eclipse.paho" % "mqtt-client" % "0.4.0", - "org.apache.cassandra" % "cassandra-all" % "1.2.5" exclude("com.google.guava", "guava") exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") From ee4178f144d7752092da53ceea686fbb6c37d5db Mon Sep 17 00:00:00 2001 From: prabeesh Date: Thu, 17 Oct 2013 09:57:48 +0530 Subject: [PATCH 08/13] remove unused dependency --- examples/pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/examples/pom.xml b/examples/pom.xml index afce8493cdfb6..224cf6c96c9cc 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -125,11 +125,6 @@ - - org.eclipse.paho - mqtt-client - 0.4.0 - From 890f8fe4393a20749e0a6cfd57ff07f60cfad2a1 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Thu, 17 Oct 2013 10:00:40 +0530 Subject: [PATCH 09/13] modify code, use Spark Logging Class --- .../streaming/dstream/MQTTInputDStream.scala | 61 ++++++++----------- 1 file changed, 26 insertions(+), 35 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index 3416989c02f2f..9b3fe67e6a4bb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -23,16 +23,16 @@ import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContex import java.util.Properties import java.util.concurrent.Executors -import java.io.IOException; +import java.io.IOException -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttTopic; +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic import scala.collection.Map import scala.collection.mutable.HashMap @@ -50,9 +50,7 @@ private[streaming] class MQTTInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { - def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel) .asInstanceOf[NetworkReceiver[T]] @@ -62,50 +60,43 @@ private[streaming] class MQTTInputDStream[T: ClassManifest]( private[streaming] class MQTTReceiver(brokerUrl: String, topic: String, storageLevel: StorageLevel) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - def onStop() { blockGenerator.stop() } - def onStart() { blockGenerator.start() - //Set up persistence for messages - var peristance:MqttClientPersistence =new MemoryPersistence(); + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) - //Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance); + // Connect to MqttBroker + client.connect() - //Connect to MqttBroker - client.connect(); - - //Subscribe to Mqtt topic - client.subscribe(topic); - - //Callback automatically triggers as and when new message arrives on specified topic + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic var callback: MqttCallback = new MqttCallback() { - //Handles Mqtt message + // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { blockGenerator += new String(arg1.getPayload()) } - + override def deliveryComplete(arg0: IMqttDeliveryToken) { } override def connectionLost(arg0: Throwable) { - System.err.println("Connection lost " + arg0) - + logInfo("Connection lost " + arg0) } + } - }; - - //Set up callback for MqttClient - client.setCallback(callback); - + // Set up callback for MqttClient + client.setCallback(callback) } - } From d223d38933b440df372dce38c6f4181586011c9e Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Fri, 18 Oct 2013 09:09:49 +0530 Subject: [PATCH 10/13] Update MQTTInputDStream.scala --- .../streaming/dstream/MQTTInputDStream.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index 9b3fe67e6a4bb..ac0528213d329 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -46,24 +46,31 @@ import scala.collection.JavaConversions._ * @param storageLevel RDD storage level. */ -private[streaming] class MQTTInputDStream[T: ClassManifest]( +private[streaming] +class MQTTInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } -private[streaming] class MQTTReceiver(brokerUrl: String, +private[streaming] +class MQTTReceiver(brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkReceiver[Any] { + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) + def onStop() { blockGenerator.stop() } + def onStart() { blockGenerator.start() From 6ec39829e9204c742e364d48c23e106625bba17d Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Fri, 18 Oct 2013 17:00:28 +0530 Subject: [PATCH 11/13] Update MQTTWordCount.scala --- .../streaming/examples/MQTTWordCount.scala | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 04e21bef5eb29..be6587b3168cd 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -22,12 +22,12 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.MQTTReceiver import org.apache.spark.storage.StorageLevel -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttTopic; +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming" @@ -47,24 +47,24 @@ object MQTTPublisher { val Seq(brokerUrl, topic) = args.toSeq try { - var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp"); - client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance); + var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) } catch { - case e: MqttException => println("Exception Caught: " + e); + case e: MqttException => println("Exception Caught: " + e) } - client.connect(); + client.connect() val msgtopic: MqttTopic = client.getTopic(topic); - val msg: String = "hello mqtt demo for spark streaming"; + val msg: String = "hello mqtt demo for spark streaming" while (true) { - val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()); + val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()) msgtopic.publish(message); - println("Published data. topic: " + msgtopic.getName() + " Message: " + message); + println("Published data. topic: " + msgtopic.getName() + " Message: " + message) } - client.disconnect(); + client.disconnect() } } @@ -109,4 +109,3 @@ object MQTTWordCount { ssc.start() } } - From dbafa11396d7c1619f5523fba5ae6abed07e90d9 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Tue, 22 Oct 2013 08:50:34 +0530 Subject: [PATCH 12/13] Update MQTTWordCount.scala --- .../apache/spark/streaming/examples/MQTTWordCount.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index be6587b3168cd..7d06505df77bc 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -30,7 +30,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic /** - * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming" + * A simple Mqtt publisher for demonstration purposes, repeatedly publishes + * Space separated String Message "hello mqtt demo for spark streaming" */ object MQTTPublisher { @@ -99,13 +100,13 @@ object MQTTWordCount { val Seq(master, brokerUrl, topic) = args.toSeq - val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), + Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) val words = lines.flatMap(x => x.toString.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() - ssc.start() } } From 9ca1bd95305a904637075e4f5747b28571114fb1 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Tue, 22 Oct 2013 09:05:57 +0530 Subject: [PATCH 13/13] Update MQTTWordCount.scala --- .../apache/spark/streaming/examples/MQTTWordCount.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 7d06505df77bc..af698a01d5118 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -33,15 +33,13 @@ import org.eclipse.paho.client.mqttv3.MqttTopic * A simple Mqtt publisher for demonstration purposes, repeatedly publishes * Space separated String Message "hello mqtt demo for spark streaming" */ - object MQTTPublisher { var client: MqttClient = _ def main(args: Array[String]) { if (args.length < 2) { - System.err.println( - "Usage: MQTTPublisher ") + System.err.println("Usage: MQTTPublisher ") System.exit(1) } @@ -52,7 +50,6 @@ object MQTTPublisher { client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) } catch { case e: MqttException => println("Exception Caught: " + e) - } client.connect() @@ -66,7 +63,6 @@ object MQTTPublisher { println("Published data. topic: " + msgtopic.getName() + " Message: " + message) } client.disconnect() - } } @@ -87,7 +83,6 @@ object MQTTPublisher { * and run the example as * `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo` */ - object MQTTWordCount { def main(args: Array[String]) {