Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt Source #40

Merged
merged 1 commit into from
Aug 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion mqtt/build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name := "akka-stream-contrib-mqtt"

libraryDependencies ++= Seq(
"org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.1.0"
"org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.1.0", // Eclipse Public License Version 1.0
"io.moquette" % "moquette-broker" % "0.8.1" % Test // Apache License Version 2.0
)

resolvers += "moquette" at "http://dl.bintray.com/andsel/maven/"
67 changes: 67 additions & 0 deletions mqtt/src/main/scala/Mqtt.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.contrib.mqtt

import akka.util.ByteString
import akka.stream.stage._
import org.eclipse.paho.client.mqttv3.{ MqttMessage => PahoMqttMessage, _ }
import scala.util._
import scala.language.implicitConversions

case class MqttSourceSettings(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final for all case class

connectionSettings: MqttConnectionSettings,
topics: Map[String, Int]
)

case class MqttConnectionSettings(
broker: String,
clientId: String,
persistence: MqttClientPersistence
)

case class MqttMessage(topic: String, payload: ByteString)

trait MqttConnectorLogic { this: GraphStageLogic =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[mqtt]?


import MqttConnectorLogic._

def connectionSettings: MqttConnectionSettings
def onConnect: AsyncCallback[IMqttAsyncClient]
def onMessage: AsyncCallback[MqttMessage]
def onConnectionLost: AsyncCallback[Throwable]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe define the AsyncCallbacks in this trait instead, and have the concrete connector just implement the synchronous callbacks, so source would implement onMessage(message: MqTTMessage)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. A valid point. Source will have even less coupling to paho APIs.


final override def preStart(): Unit = {
val client = new MqttAsyncClient(
connectionSettings.broker,
connectionSettings.clientId,
connectionSettings.persistence
)

client.connect((), connectHandler(client))
client.setCallback(new MqttCallback {
def messageArrived(topic: String, message: PahoMqttMessage) =
onMessage.invoke(MqttMessage(topic, ByteString(message.getPayload)))

def deliveryComplete(token: IMqttDeliveryToken) =
println(s"Delivery complete $token")

def connectionLost(cause: Throwable) =
onConnectionLost.invoke(cause)
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a race here, connect happens before setCallback or does the client not accept messages before callback is set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are spot on. This might be the cause of racy failures on Travis.

}

private val connectHandler: IMqttAsyncClient => Try[IMqttToken] => Unit = client => {
case Success(_) => onConnect.invoke(client)
case Failure(ex) => onConnectionLost.invoke(ex)
}
}

object MqttConnectorLogic {

implicit def funcToMqttActionListener(func: Try[IMqttToken] => Unit): IMqttActionListener = new IMqttActionListener {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used only in the internal implementation?

Copy link
Contributor Author

@2m 2m Aug 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It will probably be used in Sink impl as well. WIll make it private[mqtt]

def onSuccess(token: IMqttToken) = func(Success(token))
def onFailure(token: IMqttToken, ex: Throwable) = func(Failure(ex))
}

}
87 changes: 87 additions & 0 deletions mqtt/src/main/scala/MqttSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.contrib.mqtt

import akka.NotUsed
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream._
import scala.collection.mutable
import scala.concurrent._
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient
import org.eclipse.paho.client.mqttv3.IMqttToken
import scala.util.Try

object MqttSource {

/**
* Scala API:
*/
def apply(settings: MqttSourceSettings, bufferSize: Int): Source[MqttMessage, Future[NotUsed]] =
Source.fromGraph(new MqttSource(settings, bufferSize))

/**
* Java API:
*/
def create(settings: MqttSourceSettings, bufferSize: Int): akka.stream.javadsl.Source[MqttMessage, Future[NotUsed]] =
akka.stream.javadsl.Source.fromGraph(new MqttSource(settings, bufferSize))
}

final class MqttSource(settings: MqttSourceSettings, bufferSize: Int) extends GraphStageWithMaterializedValue[SourceShape[MqttMessage], Future[NotUsed]] {

import MqttConnectorLogic._

val out = Outlet[MqttMessage]("MqttSource.out")
override val shape: SourceShape[MqttMessage] = SourceShape.of(out)
override protected def initialAttributes: Attributes = Attributes.name("MqttSource")

private val subscriptionPromise = Promise[NotUsed]

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = (new GraphStageLogic(shape) with MqttConnectorLogic {

override val connectionSettings = settings.connectionSettings

override val onConnect = getAsyncCallback[IMqttAsyncClient](handleConnection)
override val onMessage = getAsyncCallback[MqttMessage](handleMessage)
override val onConnectionLost = getAsyncCallback[Throwable](handleConnectionLost)

private val queue = mutable.Queue[MqttMessage]()
private val mqttSubscriptionCallback: Try[IMqttToken] => Unit = conn =>
subscriptionPromise.complete(conn.map { _ => NotUsed })

setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (queue.nonEmpty) {
pushAndAckMessage(queue.dequeue())
}
}
})

def handleConnection(client: IMqttAsyncClient) = {
val (topics, qos) = settings.topics.unzip
client.subscribe(topics.toArray, qos.toArray, (), mqttSubscriptionCallback)
}

def handleMessage(message: MqttMessage): Unit = {
if (isAvailable(out)) {
pushAndAckMessage(message)
} else {
if (queue.size + 1 > bufferSize) {
failStage(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
queue.enqueue(message)
}
}
}

def pushAndAckMessage(message: MqttMessage): Unit = {
push(out, message)
}

def handleConnectionLost(ex: Throwable) =
failStage(ex)

}, subscriptionPromise.future)

}
82 changes: 82 additions & 0 deletions mqtt/src/test/scala/MqttSourceSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.contrib.mqtt

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.TestSink
import akka.util.ByteString
import io.moquette.server.Server
import io.moquette.proto.messages.PublishMessage
import io.moquette.proto.messages.AbstractMessage.QOSType
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._

class MqttSourceSpec extends WordSpec with Matchers with ScalaFutures {

"mqtt source" should {
"receive a message from a topic" in withBroker(Map("topic1" -> 0)) { settings => implicit server => implicit sys => implicit mat =>
val (subscriptionFuture, probe) = MqttSource(settings, 8).toMat(TestSink.probe)(Keep.both).run()
whenReady(subscriptionFuture) { _ =>
publish("topic1", "ohi")
probe.requestNext shouldBe MqttMessage("topic1", ByteString("ohi"))
}
}

"receive messages from multiple topics" in withBroker(Map("topic1" -> 0, "topic2" -> 0)) { settings => implicit server => implicit sys => implicit mat =>
val (subscriptionFuture, probe) = MqttSource(settings, 8).toMat(TestSink.probe)(Keep.both).run()
whenReady(subscriptionFuture) { _ =>
publish("topic1", "ohi")
publish("topic2", "hello again")
probe.requestNext shouldBe MqttMessage("topic1", ByteString("ohi"))
probe.requestNext shouldBe MqttMessage("topic2", ByteString("hello again"))
}
}

"fail stream when disconnected" in withBroker(Map("topic1" -> 0)) { settings => implicit server => implicit sys => implicit mat =>
val (subscriptionFuture, probe) = MqttSource(settings, 8).toMat(TestSink.probe)(Keep.both).run()
whenReady(subscriptionFuture) { _ =>
publish("topic1", "ohi")
probe.requestNext shouldBe MqttMessage("topic1", ByteString("ohi"))

server.stopServer()
probe.expectError.getMessage should be("Connection lost")
}
}
}

def publish(topic: String, payload: String)(implicit server: Server) = {
val msg = new PublishMessage()
msg.setPayload(ByteString(payload).toByteBuffer)
msg.setTopicName(topic)
msg.setQos(QOSType.MOST_ONE)
server.internalPublish(msg)
}

def withBroker(subscriptions: Map[String, Int])(test: MqttSourceSettings => Server => ActorSystem => Materializer => Any) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A matter of taste but I'd have done this as a trait each test case implements instead to get rid of the boilerplate-curry-signature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This is too verbose. Will move to to a trait. I think there was something on abstracting over implicit parameter lists in scala/dotty roadmap... :)

implicit val sys = ActorSystem("MqttSourceSpec")
val mat = ActorMaterializer()

val settings = MqttSourceSettings(
MqttConnectionSettings(
"tcp://localhost:1883",
"test-client",
new MemoryPersistence
),
subscriptions
)

val server = new Server()
server.startServer()
try {
test(settings)(server)(sys)(mat)
} finally {
server.stopServer()
}
}

}