Skip to content

Commit

Permalink
Update deps and rerun scalafmt.
Browse files Browse the repository at this point in the history
  • Loading branch information
omarkilani committed Sep 7, 2020
1 parent 4e3b5e7 commit 3d13fca
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 53 deletions.
4 changes: 2 additions & 2 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version = 2.3.2
align = more
version = 2.6.4
align.preset = more
maxColumn = 120
docstrings = JavaDoc
rewrite.rules = [AvoidInfix, PreferCurlyFors, SortImports, SortModifiers]
Expand Down
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ name := "akka-amqp"

version := "2.6-SNAPSHOT"

scalaVersion := "2.13.1"
scalaVersion := "2.13.3"

val akkaVersion = "2.6.1"
val rabbitmqVersion = "5.8.0"
val akkaVersion = "2.6.8"
val rabbitmqVersion = "5.9.0"
val mockitoVersion = "1.10.19"
val scalatestVersion = "3.1.0"
val scalatestVersion = "3.2.2"
val scalatestPlusVersion = "1.0.0-M2"
val rabbitmqMockVersion = "1.0.13"
val rabbitmqMockVersion = "1.1.1"

libraryDependencies ++= Seq(
"com.rabbitmq" % "amqp-client" % rabbitmqVersion,
Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.0")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
10 changes: 6 additions & 4 deletions src/main/scala/akka/amqp/AmqpExtension.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ class AmqpSettings(config: Config) {
val maxReconnectDelay: Duration = DurationLong(config.getDuration("max-reconnect-delay", MILLISECONDS)).milli
val channelThreads: Int = config.getInt("channel-threads")
val interactionTimeout: Duration = DurationLong(config.getDuration("interaction-timeout", MILLISECONDS)).milli
val channelCreationTimeout
: Duration = DurationLong(config.getDuration("channel-creation-timeout", MILLISECONDS)).milli
val channelReconnectTimeout
: Duration = DurationLong(config.getDuration("channel-reconnect-timeout", MILLISECONDS)).milli
val channelCreationTimeout: Duration = DurationLong(
config.getDuration("channel-creation-timeout", MILLISECONDS)
).milli
val channelReconnectTimeout: Duration = DurationLong(
config.getDuration("channel-reconnect-timeout", MILLISECONDS)
).milli
val publisherConfirmTimeout: FiniteDuration = DurationLong(
config.getDuration("publisher-confirm-timeout", MILLISECONDS)
).milli
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/akka/amqp/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object ChannelActor {
Props(new ChannelActor(settings) with Stash).withDispatcher("akka.amqp.stashing-dispatcher")
else
Props(new ChannelActor(settings) {
def stash(): Unit = {}
def stash(): Unit = {}
def unstashAll(): Unit = {}
})
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/akka/amqp/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ class ConnectionActor private[amqp] (connectionFactory: ConnectionFactory, setti
/**
* Creates a new channel actor. By default this actor will be able to
* stash messages if it gets disconnected. It will unstash them after reconnecting.
*
*/
def newChannelActor(stashMessages: Boolean = true) = context.actorOf {
ChannelActor(stashMessages, settings)
}
def newChannelActor(stashMessages: Boolean = true) =
context.actorOf {
ChannelActor(stashMessages, settings)
}

startWith(Disconnected, None)

Expand Down
1 change: 0 additions & 1 deletion src/main/scala/akka/amqp/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ trait ChannelConsumer { channelActor: ChannelActor =>
* declare QueueBindings if given, declare Queue and Exchange if they were given to the QueueBinding as undeclared
* setup the Consumer and store the tag.
* track the Queue and Exchange if they were declared, as well as the tag and QueueBinding.
*
*/
def setupConsumer(
channel: RabbitChannel,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/akka/amqp/MqConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
// val consumer = channel.DurableConsumer(c)(declaredQueue, ref, autoAck, queueBindings : _*)
// (ref,Some(consumer))
// }
//
//
// )
//
//}
4 changes: 2 additions & 2 deletions src/main/scala/akka/amqp/MqPublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
// val ref = new FutureActor(
//
// channel.withChannel{ implicit c =>
//
//
// val ref = refFactory.actorOf(Props(actorSetup))
// val publisher = channel.newPublisher(exchange(c))
// (ref,None)
// }
//
//
// )
//}
//
Expand Down
30 changes: 16 additions & 14 deletions src/main/scala/akka/amqp/Publisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ trait ChannelPublisher extends ConfirmListener { actor: ChannelActor =>

lazy val lock: AnyRef = new Object();
private lazy val confirmHandles = new ConcurrentHashMap[Long, ActorRef]().asScala
private lazy val unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet[Long]()) //Synchronized set must be in sychronized block!
private lazy val unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet[Long]()) //Synchronized set must be in sychronized block!

// /**
// * Seems like there should be a better way to construct this so that I dont need the preConfirmed HashMap
Expand All @@ -149,23 +150,24 @@ trait ChannelPublisher extends ConfirmListener { actor: ChannelActor =>
*/
private[amqp] def handleNack(seqNo: Long, multiple: Boolean) = handleConfirm(seqNo, multiple, false)

private def handleConfirm(seqNo: Long, multiple: Boolean, ack: Boolean) = lock.synchronized {
private def handleConfirm(seqNo: Long, multiple: Boolean, ack: Boolean) =
lock.synchronized {

if (multiple) {
val headSet = unconfirmedSet.headSet(seqNo + 1)
headSet.asScala.foreach(complete)
headSet.clear();
} else {
unconfirmedSet.remove(seqNo);
complete(seqNo)
}
if (multiple) {
val headSet = unconfirmedSet.headSet(seqNo + 1)
headSet.asScala.foreach(complete)
headSet.clear();
} else {
unconfirmedSet.remove(seqNo);
complete(seqNo)
}

def complete(seqNo: Long): Unit = {
confirmHandles.remove(seqNo).foreach {
_ ! (if (ack) Ack else Nack)
def complete(seqNo: Long): Unit = {
confirmHandles.remove(seqNo).foreach {
_ ! (if (ack) Ack else Nack)
}
}
}
}

}

Expand Down
6 changes: 4 additions & 2 deletions src/test/scala/akka/amqp/AmqpConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ object AmqpConfig {
val settings = new AmqpSettings(config.getConfig("akka.amqp.default"))
}
object Invalid {
val config = ConfigFactory.parseString("""
val config = ConfigFactory
.parseString("""
akka.amqp.default {
addresses = ["invalid-test-connection-no-op:1234"]
}
""").withFallback(Valid.config)
""")
.withFallback(Valid.config)
val settings = new AmqpSettings(config.getConfig("akka.amqp.default"))
}
}
30 changes: 17 additions & 13 deletions src/test/scala/akka/amqp/AmqpTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,22 @@ class MyAnswer(invoke: InvocationOnMock => Unit) extends Answer[Unit] {
val lock: AnyRef = new Object
@volatile
private var runCount: Int = 0
def count = lock.synchronized {
runCount
}
def atLeastOnce = lock.synchronized {
runCount > 0
}
def once = lock.synchronized {
runCount == 1
}
def count =
lock.synchronized {
runCount
}
def atLeastOnce =
lock.synchronized {
runCount > 0
}
def once =
lock.synchronized {
runCount == 1
}

override def answer(invocation: InvocationOnMock): Unit = lock.synchronized {
runCount = runCount + 1
invoke(invocation)
}
override def answer(invocation: InvocationOnMock): Unit =
lock.synchronized {
runCount = runCount + 1
invoke(invocation)
}
}
2 changes: 1 addition & 1 deletion src/test/scala/akka/amqp/ChannelSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class ChannelSpec extends AkkaSpec(AmqpConfig.Valid.config) with AmqpMock {

"Durable Channel Actor" should {
val channelActor = TestFSMRef(new ChannelActor(AmqpConfig.Valid.settings) {
def stash(): Unit = {}
def stash(): Unit = {}
def unstashAll(): Unit = {}
})

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/akka/amqp/ConnectionSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package akka.amqp

import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition, UnsubscribeTransitionCallBack}
import akka.testkit.{AkkaSpec, TestFSMRef, TestActors}
import akka.testkit.{AkkaSpec, TestActors, TestFSMRef}
import akka.actor.{ActorRef, PoisonPill}
import com.github.fridujo.rabbitmq.mock.MockConnectionFactory
import scala.concurrent.duration._
Expand Down

0 comments on commit 3d13fca

Please sign in to comment.