diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Consume.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Consume.scala index ac8d3f76..92cb8023 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Consume.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Consume.scala @@ -21,7 +21,7 @@ import cats.effect.std.Dispatcher import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.{Applicative, Functor} -import com.rabbitmq.client.{AMQP, Consumer, DefaultConsumer, Envelope} +import com.rabbitmq.client.{AMQP, Consumer, DefaultConsumer, Envelope, ShutdownSignalException} import dev.profunktor.fs2rabbit.arguments.{Arguments, _} import dev.profunktor.fs2rabbit.model._ @@ -106,6 +106,11 @@ object Consume { } } } + + override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = + if (!sig.isInitiatedByApplication) { + internals.queue.foreach(q => dispatcher.unsafeRunAndForget(q.offer(Left(sig)))) + } } } diff --git a/docker-compose.yml b/docker-compose.yml index 752f0e0a..966c5800 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,3 +5,5 @@ RabbitMQ: - "5672:5672" environment: - DEBUG=false + volumes: + - ./rabbit-test-config/:/etc/rabbitmq/ diff --git a/rabbit-test-config/advanced.config b/rabbit-test-config/advanced.config new file mode 100644 index 00000000..87d8581a --- /dev/null +++ b/rabbit-test-config/advanced.config @@ -0,0 +1,6 @@ +[ + {rabbit, [ + {channel_tick_interval, 500}, + {loopback_users, []} + ]} +]. \ No newline at end of file diff --git a/rabbit-test-config/rabbitmq.conf b/rabbit-test-config/rabbitmq.conf new file mode 100644 index 00000000..399d6cd0 --- /dev/null +++ b/rabbit-test-config/rabbitmq.conf @@ -0,0 +1 @@ +consumer_timeout = 1000 \ No newline at end of file diff --git a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala index a8ef7d00..8e1d9a4d 100644 --- a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala +++ b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala @@ -711,6 +711,28 @@ trait Fs2RabbitSpec { self: BaseSpec => } } + it should "shutdown the stream when the server closes the channel" in withRabbit { interpreter => + import interpreter._ + val msg = "will-not-be-acked" + createConnectionChannel.use { implicit channel => + for { + qxrk <- randomQueueData + (q, x, rk) = qxrk + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) + publisher <- createPublisher[String](x, rk) + _ <- publisher(msg) + stream <- createAckerConsumer(q).map(_._2) + results <- stream.attempt.compile.toList.timeoutAndForget(Duration(10, "s")) + } yield { + results.size shouldEqual 2 + results.head.map(_.payload) shouldEqual Right(msg) + results.last.isLeft shouldEqual true + } + } + } + it should "preserve order of published messages" in withRabbit { interpreter => import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder} import interpreter._