Skip to content

Commit

Permalink
Semaphore for sending
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Mar 19, 2022
1 parent 147346e commit 73b4a07
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions dom/src/main/scala/org/http4s/dom/WebSocketClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import scodec.bits.ByteVector

import scala.scalajs.js
import scala.scalajs.js.JSConverters._
import fs2.Pipe

final class WebSocketException private[dom] (
private[dom] val reason: String
Expand All @@ -54,7 +55,8 @@ object WebSocketClient {
for {
dispatcher <- Dispatcher[F]
messages <- Queue.unbounded[F, Option[MessageEvent]].toResource
semaphore <- Semaphore[F](1).toResource
receiveSemaphore <- Semaphore[F](1).toResource
sendSemaphore <- Semaphore[F](1).toResource
error <- F.deferred[Either[Throwable, INothing]].toResource
close <- F.deferred[CloseEvent].toResource
ws <- Resource.makeCase {
Expand Down Expand Up @@ -125,15 +127,15 @@ object WebSocketClient {
def closeFrame: DeferredSource[F, WSFrame.Close] =
(close: DeferredSource[F, CloseEvent]).map(e => WSFrame.Close(e.code, e.reason))

def receive: F[Option[WSDataFrame]] = semaphore
def receive: F[Option[WSDataFrame]] = receiveSemaphore
.permit
.use(_ => OptionT(messages.take).semiflatMap(decodeMessage).value)
.surround(OptionT(messages.take).semiflatMap(decodeMessage).value)
.race(error.get.rethrow)
.map(_.merge)

override def receiveStream: Stream[F, WSDataFrame] =
Stream
.resource(semaphore.permit)
.resource(receiveSemaphore.permit)
.flatMap(_ => Stream.fromQueueNoneTerminated(messages))
.evalMap(decodeMessage)
.concurrently(Stream.exec(error.get.rethrow.widen))
Expand All @@ -150,10 +152,10 @@ object WebSocketClient {
}

override def sendText(text: String): F[Unit] =
errorOr(F.delay(ws.send(text)))
errorOr(sendSemaphore.permit.surround(F.delay(ws.send(text))))

override def sendBinary(bytes: ByteVector): F[Unit] =
errorOr(F.delay(ws.send(bytes.toJSArrayBuffer)))
errorOr(sendSemaphore.permit.surround(F.delay(ws.send(bytes.toJSArrayBuffer))))

def send(wsf: WSDataFrame): F[Unit] =
wsf match {
Expand All @@ -169,7 +171,10 @@ object WebSocketClient {
}

def sendMany[G[_]: Foldable, A <: WSDataFrame](wsfs: G[A]): F[Unit] =
wsfs.foldMapM(send(_))
sendSemaphore.permit.surround(wsfs.foldMapM(send(_)))

override def sendPipe: Pipe[F, WSDataFrame, Unit] = in =>
Stream.resource(sendSemaphore.permit) >> in.evalMap(send(_))

def subprotocol: Option[String] = Option(ws.protocol).filter(_.nonEmpty)
}
Expand Down

0 comments on commit 73b4a07

Please sign in to comment.