Skip to content

Commit

Permalink
Maintain stable connection to rabbitmq from cannon. [WIP]
Browse files Browse the repository at this point in the history
  • Loading branch information
fisx committed Oct 7, 2024
1 parent 2dbcb61 commit 861ee10
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 30 deletions.
44 changes: 32 additions & 12 deletions libs/extended/src/Network/AMQP/Extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ module Network.AMQP.Extended
( RabbitMqHooks (..),
RabbitMqAdminOpts (..),
AmqpEndpoint (..),
withConnection,
mkStableRabbitmqConn,
mkRabbitmqConn,
getStableRabbitmqConn,
taintStableRabbitmqConn,
stableRabbitmqConnRepairLoop,
openConnectionWithRetries,
mkRabbitMqAdminClientEnv,
mkRabbitMqChannelMVar,
Expand Down Expand Up @@ -146,15 +150,11 @@ data RabbitMqConnectionError = RabbitMqConnectionFailed String

instance Exception RabbitMqConnectionError

-- | Connects with RabbitMQ and opens a channel.
withConnection ::
forall m a.
(MonadIO m, MonadMask m) =>
Logger ->
AmqpEndpoint ->
(Q.Connection -> m a) ->
m a
withConnection l AmqpEndpoint {..} k = do
mkStableRabbitmqConn :: (MonadIO m, MonadMask m) => Logger -> AmqpEndpoint -> m (MVar (Maybe Q.Connection))
mkStableRabbitmqConn l ep = (newMVar . Just) =<< mkRabbitmqConn l ep

mkRabbitmqConn :: (MonadIO m, MonadMask m) => Logger -> AmqpEndpoint -> m Q.Connection
mkRabbitmqConn l AmqpEndpoint {..} = do
(username, password) <- liftIO $ readCredsFromEnv
-- Jittered exponential backoff with 1ms as starting delay and 1s as total
-- wait time.
Expand All @@ -173,7 +173,7 @@ withConnection l AmqpEndpoint {..} k = do
)
( const $ do
Log.info l $ Log.msg (Log.val "Trying to connect to RabbitMQ")
mTlsSettings <- traverse (liftIO . (mkTLSSettings host)) tls
mTlsSettings <- traverse (liftIO . (mkTLSSettings host)) tls -- TODO: error here is about ambiguous record fields i think?
liftIO $
Q.openConnection'' $
Q.defaultConnectionOpts
Expand All @@ -183,7 +183,27 @@ withConnection l AmqpEndpoint {..} k = do
Q.coTLSSettings = fmap Q.TLSCustom mTlsSettings
}
)
bracket getConn (liftIO . Q.closeConnection) k
getConn

getStableRabbitmqConn :: (MonadIO m) => MVar (Maybe Q.Connection) -> m (Maybe Q.Connection)
getStableRabbitmqConn = readMVar

taintStableRabbitmqConn :: (MonadIO m) => MVar (Maybe Q.Connection) -> m ()
taintStableRabbitmqConn mvar = void $ swapMVar mvar Nothing

-- | Keep an eye on the stableRabbitmqConnection. If it is tainted (eg., there is Nothing in
-- it, see above), create a new one and put it.
stableRabbitmqConnRepairLoop :: (MonadMask m, MonadUnliftIO m) => Logger -> AmqpEndpoint -> MVar (Maybe Q.Connection) -> m (Async ())
stableRabbitmqConnRepairLoop l ep mvar = async . forever $ do
mustRepair <- isNothing <$> readMVar mvar
if mustRepair
then do
-- TODO: this block should probably catch at least SomeException (and probably not SomeAsyncException.
conn <- mkRabbitmqConn l ep
void $ swapMVar mvar (Just conn)
threadDelay 1_800_000 -- if it fails, retry connecting every 1.8s
else do
threadDelay 100_000 -- checking for Nothing is very cheap, so it's ok to run this every 100ms.

-- | Connects with RabbitMQ and opens a channel. If the channel is closed for
-- some reasons, reopens the channel. If the connection is closed for some
Expand Down
3 changes: 2 additions & 1 deletion services/cannon/src/Cannon/API/Public.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ streamData userId connId clientId con = do
consumeEvents :: UserId -> ClientId -> PendingConnection -> Cannon ()
consumeEvents userId clientId con = do
e <- wsenv
liftIO $ rabbitMQWebSocketApp userId clientId e con
r <- stableRabbitmqConn
liftIO $ rabbitMQWebSocketApp userId clientId r e con
19 changes: 13 additions & 6 deletions services/cannon/src/Cannon/RabbitMqConsumerApp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,26 @@ import Data.Aeson qualified as Aeson
import Data.Id
import Imports
import Network.AMQP qualified as Amqp
import Network.AMQP.Extended (withConnection)
import Network.AMQP.Extended (getStableRabbitmqConn)
import Network.WebSockets
import Network.WebSockets qualified as WS
import System.Logger qualified as Log
import Wire.API.Notification
import Wire.API.WebSocket

rabbitMQWebSocketApp :: UserId -> ClientId -> Env -> ServerApp
rabbitMQWebSocketApp uid cid e pendingConn = do
rabbitMQWebSocketApp :: UserId -> ClientId -> MVar (Maybe Amqp.Connection) -> Env -> ServerApp
rabbitMQWebSocketApp uid cid rConn e pendingConn = do
wsConn <- liftIO (acceptRequest pendingConn `catch` rejectOnError pendingConn)
closeWS <- newEmptyMVar
-- TODO: Don't create new conns for every client, this will definitely kill rabbit
withConnection e.logg e.rabbitmq $ \conn -> do
chan <- Amqp.openChannel conn -- TODO: should we open a channel for every request? or have a pool of them?

do
-- FUTUREWORK: we pool connections, but not channels. however, channel pooling is also a
-- thing! we should generate some performance data using otel and decide whether we want
-- to do it.
-- https://stackoverflow.com/questions/10365867/how-can-i-pool-channels-in-rabbitmq
mConn <- getStableRabbitmqConn rConn
chan <- maybe (throwIO ConnectionClosed) Amqp.openChannel mConn

let handleConsumerError :: (Exception e) => e -> IO ()
handleConsumerError err = do
Log.err e.logg $
Expand All @@ -35,6 +41,7 @@ rabbitMQWebSocketApp uid cid e pendingConn = do

handleConnectionClosed :: ConnectionException -> IO ()
handleConnectionClosed err = do
-- TODO: extract "Log.msg ..." into helper function. don't say "pushing" in pulling exceptions. make everything nicer.
Log.info e.logg $
Log.msg (Log.val "Pushing to WS failed, closing connection")
. Log.field "error" (displayException err)
Expand Down
7 changes: 5 additions & 2 deletions services/cannon/src/Cannon/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import Data.Text.Encoding (encodeUtf8)
import Data.Typeable
import Imports hiding (head, threadDelay)
import Network.AMQP
import Network.AMQP.Extended (mkRabbitMqChannelMVar)
import Network.AMQP.Extended (mkRabbitMqChannelMVar, mkStableRabbitmqConn, stableRabbitmqConnRepairLoop)
import Network.Wai qualified as Wai
import Network.Wai.Handler.Warp hiding (run)
import Network.Wai.Middleware.Gzip qualified as Gzip
Expand Down Expand Up @@ -75,13 +75,15 @@ run o = withTracer \tracer -> do
error "drainOpts.gracePeriodSeconds must not be set to 0."
ext <- loadExternal
g <- L.mkLogger (o ^. logLevel) (o ^. logNetStrings) (o ^. logFormat)
rabbitConn <- mkStableRabbitmqConn g (o ^. Cannon.Options.rabbitmq)
rabbitRepairLoop <- stableRabbitmqConnRepairLoop g (o ^. Cannon.Options.rabbitmq) rabbitConn
e <-
mkEnv ext o g
<$> D.empty 128
<*> newManager defaultManagerSettings {managerConnCount = 128}
<*> createSystemRandom
<*> mkClock
<*> pure (o ^. Cannon.Options.rabbitmq)
<*> pure rabbitConn
createUserNotificationsExchange $ applog e
refreshMetricsThread <- Async.async $ runCannon e refreshMetrics
s <- newSettings $ Server (o ^. cannon . host) (o ^. cannon . port) (applog e) (Just idleTimeout)
Expand Down Expand Up @@ -110,6 +112,7 @@ run o = withTracer \tracer -> do
-- but it's a sensitive change, and it looks like this is closing all the websockets at
-- the same time and then calling the drain script. I suspect this might be due to some
-- cleanup in wai. this needs to be tested very carefully when touched.
Async.cancel rabbitRepairLoop
Async.cancel refreshMetricsThread
L.close (applog e)
where
Expand Down
15 changes: 10 additions & 5 deletions services/cannon/src/Cannon/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
module Cannon.Types
( Env,
opts,
stableRabbitmqConn,
applog,
dict,
env,
Expand All @@ -44,7 +45,7 @@ import Control.Lens ((^.))
import Control.Monad.Catch
import Data.Text.Encoding
import Imports
import Network.AMQP.Extended (AmqpEndpoint)
import Network.AMQP (Connection)
import Prometheus
import Servant qualified
import System.Logger qualified as Logger
Expand All @@ -59,6 +60,7 @@ data Env = Env
applog :: !Logger,
dict :: !(Dict Key Websocket),
reqId :: !RequestId,
stableRabbitmqConn_ :: MVar (Maybe Connection),
env :: !WS.Env
}

Expand Down Expand Up @@ -99,11 +101,11 @@ mkEnv ::
Manager ->
GenIO ->
Clock ->
AmqpEndpoint ->
MVar (Maybe Connection) ->
Env
mkEnv external o l d p g t rabbitmqOpts =
Env o l d (RequestId "N/A") $
WS.env external (o ^. cannon . port) (encodeUtf8 $ o ^. gundeck . host) (o ^. gundeck . port) l p d g t (o ^. drainOpts) rabbitmqOpts
mkEnv external o l d p g t stableRabbit =
Env o l d (RequestId "N/A") stableRabbit $
WS.env external (o ^. cannon . port) (encodeUtf8 $ o ^. gundeck . host) (o ^. gundeck . port) l p d g t (o ^. drainOpts)

runCannon :: Env -> Cannon a -> IO a
runCannon e c = runReaderT (unCannon c) e
Expand All @@ -117,6 +119,9 @@ wsenv = Cannon $ do
r <- asks reqId
pure $ WS.setRequestId r e

stableRabbitmqConn :: Cannon (MVar (Maybe Connection))
stableRabbitmqConn = Cannon $ asks stableRabbitmqConn_

-- | Natural transformation from 'Cannon' to 'Handler' monad.
-- Used to call 'Cannon' from servant.
runCannonToServant :: Cannon.Types.Env -> Cannon x -> Servant.Handler x
Expand Down
5 changes: 1 addition & 4 deletions services/cannon/src/Cannon/WS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ import Data.List.Extra (chunksOf)
import Data.Text.Encoding (decodeUtf8)
import Data.Timeout (TimeoutUnit (..), (#))
import Imports hiding (threadDelay)
import Network.AMQP.Extended
import Network.HTTP.Types.Method
import Network.HTTP.Types.Status
import Network.Wai.Utilities.Error
Expand Down Expand Up @@ -146,8 +145,7 @@ data Env = Env
dict :: !(Dict Key Websocket),
rand :: !GenIO,
clock :: !Clock,
drainOpts :: DrainOpts,
rabbitmq :: !AmqpEndpoint
drainOpts :: DrainOpts
}

setRequestId :: RequestId -> Env -> Env
Expand Down Expand Up @@ -193,7 +191,6 @@ env ::
GenIO ->
Clock ->
DrainOpts ->
AmqpEndpoint ->
Env
env leh lp gh gp = Env leh lp (Bilge.host gh . Bilge.port gp $ empty) (RequestId "N/A")

Expand Down

0 comments on commit 861ee10

Please sign in to comment.