-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mqtt Source #40
Mqtt Source #40
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,8 @@ | ||
name := "akka-stream-contrib-mqtt" | ||
|
||
libraryDependencies ++= Seq( | ||
"org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.1.0" | ||
"org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.1.0", // Eclipse Public License Version 1.0 | ||
"io.moquette" % "moquette-broker" % "0.8.1" % Test // Apache License Version 2.0 | ||
) | ||
|
||
resolvers += "moquette" at "http://dl.bintray.com/andsel/maven/" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
package akka.stream.contrib.mqtt | ||
|
||
import akka.util.ByteString | ||
import akka.stream.stage._ | ||
import org.eclipse.paho.client.mqttv3.{ MqttMessage => PahoMqttMessage, _ } | ||
import scala.util._ | ||
import scala.language.implicitConversions | ||
|
||
case class MqttSourceSettings( | ||
connectionSettings: MqttConnectionSettings, | ||
topics: Map[String, Int] | ||
) | ||
|
||
case class MqttConnectionSettings( | ||
broker: String, | ||
clientId: String, | ||
persistence: MqttClientPersistence | ||
) | ||
|
||
case class MqttMessage(topic: String, payload: ByteString) | ||
|
||
trait MqttConnectorLogic { this: GraphStageLogic => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
import MqttConnectorLogic._ | ||
|
||
def connectionSettings: MqttConnectionSettings | ||
def onConnect: AsyncCallback[IMqttAsyncClient] | ||
def onMessage: AsyncCallback[MqttMessage] | ||
def onConnectionLost: AsyncCallback[Throwable] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe define the AsyncCallbacks in this trait instead, and have the concrete connector just implement the synchronous callbacks, so source would implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. A valid point. Source will have even less coupling to |
||
|
||
final override def preStart(): Unit = { | ||
val client = new MqttAsyncClient( | ||
connectionSettings.broker, | ||
connectionSettings.clientId, | ||
connectionSettings.persistence | ||
) | ||
|
||
client.connect((), connectHandler(client)) | ||
client.setCallback(new MqttCallback { | ||
def messageArrived(topic: String, message: PahoMqttMessage) = | ||
onMessage.invoke(MqttMessage(topic, ByteString(message.getPayload))) | ||
|
||
def deliveryComplete(token: IMqttDeliveryToken) = | ||
println(s"Delivery complete $token") | ||
|
||
def connectionLost(cause: Throwable) = | ||
onConnectionLost.invoke(cause) | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't there a race here, connect happens before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you are spot on. This might be the cause of racy failures on Travis. |
||
} | ||
|
||
private val connectHandler: IMqttAsyncClient => Try[IMqttToken] => Unit = client => { | ||
case Success(_) => onConnect.invoke(client) | ||
case Failure(ex) => onConnectionLost.invoke(ex) | ||
} | ||
} | ||
|
||
object MqttConnectorLogic { | ||
|
||
implicit def funcToMqttActionListener(func: Try[IMqttToken] => Unit): IMqttActionListener = new IMqttActionListener { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used only in the internal implementation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. It will probably be used in Sink impl as well. WIll make it |
||
def onSuccess(token: IMqttToken) = func(Success(token)) | ||
def onFailure(token: IMqttToken, ex: Throwable) = func(Failure(ex)) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
package akka.stream.contrib.mqtt | ||
|
||
import akka.NotUsed | ||
import akka.stream.scaladsl._ | ||
import akka.stream.stage._ | ||
import akka.stream._ | ||
import scala.collection.mutable | ||
import scala.concurrent._ | ||
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient | ||
import org.eclipse.paho.client.mqttv3.IMqttToken | ||
import scala.util.Try | ||
|
||
object MqttSource { | ||
|
||
/** | ||
* Scala API: | ||
*/ | ||
def apply(settings: MqttSourceSettings, bufferSize: Int): Source[MqttMessage, Future[NotUsed]] = | ||
Source.fromGraph(new MqttSource(settings, bufferSize)) | ||
|
||
/** | ||
* Java API: | ||
*/ | ||
def create(settings: MqttSourceSettings, bufferSize: Int): akka.stream.javadsl.Source[MqttMessage, Future[NotUsed]] = | ||
akka.stream.javadsl.Source.fromGraph(new MqttSource(settings, bufferSize)) | ||
} | ||
|
||
final class MqttSource(settings: MqttSourceSettings, bufferSize: Int) extends GraphStageWithMaterializedValue[SourceShape[MqttMessage], Future[NotUsed]] { | ||
|
||
import MqttConnectorLogic._ | ||
|
||
val out = Outlet[MqttMessage]("MqttSource.out") | ||
override val shape: SourceShape[MqttMessage] = SourceShape.of(out) | ||
override protected def initialAttributes: Attributes = Attributes.name("MqttSource") | ||
|
||
private val subscriptionPromise = Promise[NotUsed] | ||
|
||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = (new GraphStageLogic(shape) with MqttConnectorLogic { | ||
|
||
override val connectionSettings = settings.connectionSettings | ||
|
||
override val onConnect = getAsyncCallback[IMqttAsyncClient](handleConnection) | ||
override val onMessage = getAsyncCallback[MqttMessage](handleMessage) | ||
override val onConnectionLost = getAsyncCallback[Throwable](handleConnectionLost) | ||
|
||
private val queue = mutable.Queue[MqttMessage]() | ||
private val mqttSubscriptionCallback: Try[IMqttToken] => Unit = conn => | ||
subscriptionPromise.complete(conn.map { _ => NotUsed }) | ||
|
||
setHandler(out, new OutHandler { | ||
override def onPull(): Unit = { | ||
if (queue.nonEmpty) { | ||
pushAndAckMessage(queue.dequeue()) | ||
} | ||
} | ||
}) | ||
|
||
def handleConnection(client: IMqttAsyncClient) = { | ||
val (topics, qos) = settings.topics.unzip | ||
client.subscribe(topics.toArray, qos.toArray, (), mqttSubscriptionCallback) | ||
} | ||
|
||
def handleMessage(message: MqttMessage): Unit = { | ||
if (isAvailable(out)) { | ||
pushAndAckMessage(message) | ||
} else { | ||
if (queue.size + 1 > bufferSize) { | ||
failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize")) | ||
} else { | ||
queue.enqueue(message) | ||
} | ||
} | ||
} | ||
|
||
def pushAndAckMessage(message: MqttMessage): Unit = { | ||
push(out, message) | ||
} | ||
|
||
def handleConnectionLost(ex: Throwable) = | ||
failStage(ex) | ||
|
||
}, subscriptionPromise.future) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
package akka.stream.contrib.mqtt | ||
|
||
import akka.actor.ActorSystem | ||
import akka.stream._ | ||
import akka.stream.scaladsl.Keep | ||
import akka.stream.testkit.scaladsl.TestSink | ||
import akka.util.ByteString | ||
import io.moquette.server.Server | ||
import io.moquette.proto.messages.PublishMessage | ||
import io.moquette.proto.messages.AbstractMessage.QOSType | ||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence | ||
import org.scalatest._ | ||
import org.scalatest.concurrent.ScalaFutures | ||
import scala.concurrent.duration._ | ||
|
||
class MqttSourceSpec extends WordSpec with Matchers with ScalaFutures { | ||
|
||
"mqtt source" should { | ||
"receive a message from a topic" in withBroker(Map("topic1" -> 0)) { settings => implicit server => implicit sys => implicit mat => | ||
val (subscriptionFuture, probe) = MqttSource(settings, 8).toMat(TestSink.probe)(Keep.both).run() | ||
whenReady(subscriptionFuture) { _ => | ||
publish("topic1", "ohi") | ||
probe.requestNext shouldBe MqttMessage("topic1", ByteString("ohi")) | ||
} | ||
} | ||
|
||
"receive messages from multiple topics" in withBroker(Map("topic1" -> 0, "topic2" -> 0)) { settings => implicit server => implicit sys => implicit mat => | ||
val (subscriptionFuture, probe) = MqttSource(settings, 8).toMat(TestSink.probe)(Keep.both).run() | ||
whenReady(subscriptionFuture) { _ => | ||
publish("topic1", "ohi") | ||
publish("topic2", "hello again") | ||
probe.requestNext shouldBe MqttMessage("topic1", ByteString("ohi")) | ||
probe.requestNext shouldBe MqttMessage("topic2", ByteString("hello again")) | ||
} | ||
} | ||
|
||
"fail stream when disconnected" in withBroker(Map("topic1" -> 0)) { settings => implicit server => implicit sys => implicit mat => | ||
val (subscriptionFuture, probe) = MqttSource(settings, 8).toMat(TestSink.probe)(Keep.both).run() | ||
whenReady(subscriptionFuture) { _ => | ||
publish("topic1", "ohi") | ||
probe.requestNext shouldBe MqttMessage("topic1", ByteString("ohi")) | ||
|
||
server.stopServer() | ||
probe.expectError.getMessage should be("Connection lost") | ||
} | ||
} | ||
} | ||
|
||
def publish(topic: String, payload: String)(implicit server: Server) = { | ||
val msg = new PublishMessage() | ||
msg.setPayload(ByteString(payload).toByteBuffer) | ||
msg.setTopicName(topic) | ||
msg.setQos(QOSType.MOST_ONE) | ||
server.internalPublish(msg) | ||
} | ||
|
||
def withBroker(subscriptions: Map[String, Int])(test: MqttSourceSettings => Server => ActorSystem => Materializer => Any) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A matter of taste but I'd have done this as a trait each test case implements instead to get rid of the boilerplate-curry-signature. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. This is too verbose. Will move to to a trait. I think there was something on abstracting over implicit parameter lists in scala/dotty roadmap... :) |
||
implicit val sys = ActorSystem("MqttSourceSpec") | ||
val mat = ActorMaterializer() | ||
|
||
val settings = MqttSourceSettings( | ||
MqttConnectionSettings( | ||
"tcp://localhost:1883", | ||
"test-client", | ||
new MemoryPersistence | ||
), | ||
subscriptions | ||
) | ||
|
||
val server = new Server() | ||
server.startServer() | ||
try { | ||
test(settings)(server)(sys)(mat) | ||
} finally { | ||
server.stopServer() | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
for allcase class