Skip to content

Commit

Permalink
Merge pull request #528 from catostrophe/widen-dispatcher-scope
Browse files Browse the repository at this point in the history
Couple Dispatcher lifetime to stream
  • Loading branch information
gvolpe authored Apr 1, 2021
2 parents 494d1a1 + 51a0c7f commit ffd4561
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@ private[pubsub] class LivePubSubCommands[F[_]: Async: RedisExecutor: Log, K, V](
subCommands.unsubscribe(channel)

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
_.evalMap { message =>
state.get.flatMap { st =>
PubSubInternals[F, K, V](state, subConnection).apply(channel)(st) *>
FutureLift[F].lift(Sync[F].delay(pubConnection.async().publish(channel.underlying, message))).void
}
_.flatMap { message =>
Stream.resource(Resource.eval(state.get) >>= PubSubInternals[F, K, V](state, subConnection).apply(channel)) >>
Stream.eval(FutureLift[F].lift(Sync[F].delay(pubConnection.async().publish(channel.underlying, message))).void)
}

override def pubSubChannels: Stream[F, List[K]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package dev.profunktor.redis4cats.pubsub.internals

import cats.Applicative
import cats.effect.kernel.{ Async, Ref, Sync }
import cats.effect.kernel.{ Async, Ref, Resource, Sync }
import cats.effect.std.Dispatcher
import cats.syntax.all._
import dev.profunktor.redis4cats.data.RedisChannel
Expand Down Expand Up @@ -50,15 +49,15 @@ object PubSubInternals {
): GetOrCreateTopicListener[F, K, V] = { channel => st =>
st.get(channel.underlying)
.fold {
Dispatcher[F].use { dispatcher =>
Dispatcher[F].evalMap { dispatcher =>
Topic[F, Option[V]].flatTap { topic =>
val listener = defaultListener(channel, topic, dispatcher)
Log[F].info(s"Creating listener for channel: $channel") *>
Sync[F].delay(subConnection.addListener(listener)) *>
state.update(_.updated(channel.underlying, topic))
}
}
}(Applicative[F].pure)
}(Resource.pure)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log: RedisExecutor, K,

override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
Stream
.eval(
state.get.flatMap { st =>
PubSubInternals[F, K, V](state, subConnection).apply(channel)(st) <*
FutureLift[F].lift(Sync[F].delay(subConnection.async().subscribe(channel.underlying)))
}
)
.resource(Resource.eval(state.get) >>= PubSubInternals[F, K, V](state, subConnection).apply(channel))
.evalTap(_ => FutureLift[F].lift(Sync[F].delay(subConnection.async().subscribe(channel.underlying))))
.flatMap(_.subscribe(500).unNone)

override def unsubscribe(channel: RedisChannel[K]): Stream[F, Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package dev.profunktor.redis4cats.pubsub

import cats.effect.kernel.Resource
import dev.profunktor.redis4cats.data.RedisChannel
import fs2.concurrent.Topic

package object internals {
private[pubsub] type PubSubState[F[_], K, V] = Map[K, Topic[F, Option[V]]]
private[pubsub] type GetOrCreateTopicListener[F[_], K, V] =
RedisChannel[K] => PubSubState[F, K, V] => F[Topic[F, Option[V]]]
RedisChannel[K] => PubSubState[F, K, V] => Resource[F, Topic[F, Option[V]]]
}

0 comments on commit ffd4561

Please sign in to comment.