-
Notifications
You must be signed in to change notification settings - Fork 87
/
Copy pathServer.hs
203 lines (184 loc) · 6.73 KB
/
Server.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
{-# LANGUAGE UndecidableInstances #-}
module Hydra.API.Server where
import Hydra.Prelude hiding (TVar, mapM_, readTVar, seq)
import Cardano.Ledger.Core (PParams)
import Conduit (runConduitRes, sinkList, (.|))
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
import Control.Exception (IOException)
import Data.Conduit.Combinators (iterM)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.HTTPServer (httpApp)
import Hydra.API.Projection (Projection (..), mkProjection)
import Hydra.API.ServerOutput (
CommitInfo (CannotCommit),
HeadStatus (Idle),
ServerOutput,
TimedServerOutput (..),
projectCommitInfo,
projectHeadStatus,
projectInitializingHeadId,
projectPendingDeposits,
projectSnapshotUtxo,
)
import Hydra.API.ServerOutputFilter (
ServerOutputFilter,
)
import Hydra.API.WSServer (nextSequenceNumber, wsApp)
import Hydra.Cardano.Api (LedgerEra)
import Hydra.Chain (Chain (..))
import Hydra.Chain.ChainState (IsChainState)
import Hydra.Chain.Direct.State ()
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (IP, PortNumber)
import Hydra.Persistence (PersistenceIncremental (..))
import Hydra.Tx (Party)
import Hydra.Tx.Environment (Environment)
import Network.HTTP.Types (status500)
import Network.Wai (responseLBS)
import Network.Wai.Handler.Warp (
defaultSettings,
runSettings,
setBeforeMainLoop,
setHost,
setOnException,
setOnExceptionResponse,
setPort,
)
import Network.Wai.Handler.WarpTLS (runTLS, tlsSettings)
import Network.Wai.Handler.WebSockets (websocketsOr)
import Network.Wai.Middleware.Cors (simpleCors)
import Network.WebSockets (
defaultConnectionOptions,
)
-- | Handle to provide a means for sending server outputs to clients.
newtype Server tx m = Server
{ sendOutput :: ServerOutput tx -> m ()
-- ^ Send some output to all connected clients.
}
-- | Callback for receiving client inputs.
type ServerCallback tx m = ClientInput tx -> m ()
-- | A type tying both receiving input and sending output into a /Component/.
type ServerComponent tx m a = ServerCallback tx m -> (Server tx m -> m a) -> m a
data APIServerConfig = APIServerConfig
{ host :: IP
, port :: PortNumber
, tlsCertPath :: Maybe FilePath
, tlsKeyPath :: Maybe FilePath
}
withAPIServer ::
forall tx.
IsChainState tx =>
APIServerConfig ->
Environment ->
Party ->
PersistenceIncremental (TimedServerOutput tx) IO ->
Tracer IO APIServerLog ->
Chain tx IO ->
PParams LedgerEra ->
ServerOutputFilter tx ->
ServerComponent tx IO ()
withAPIServer config env party persistence tracer chain pparams serverOutputFilter callback action = do
responseChannel <- newBroadcastTChanIO
-- Intialize our read models from stored events
-- NOTE: we do not keep the stored events around in memory
headStatusP <- mkProjection Idle projectHeadStatus
snapshotUtxoP <- mkProjection Nothing projectSnapshotUtxo
commitInfoP <- mkProjection CannotCommit projectCommitInfo
headIdP <- mkProjection Nothing projectInitializingHeadId
pendingDepositsP <- mkProjection [] projectPendingDeposits
loadedHistory <-
runConduitRes $
source
-- .| mapC output
.| iterM (lift . atomically . update headStatusP . output)
.| iterM (lift . atomically . update snapshotUtxoP . output)
.| iterM (lift . atomically . update commitInfoP . output)
.| iterM (lift . atomically . update headIdP . output)
.| iterM (lift . atomically . update pendingDepositsP . output)
-- FIXME: don't load whole history into memory
.| sinkList
-- NOTE: we need to reverse the list because we store history in a reversed
-- list in memory but in order on disk
history <- newTVarIO $ reverse loadedHistory
(notifyServerRunning, waitForServerRunning) <- setupServerNotification
let serverSettings =
defaultSettings
& setHost (fromString $ show host)
& setPort (fromIntegral port)
& setOnException (\_ e -> traceWith tracer $ APIConnectionError{reason = show e})
& setOnExceptionResponse (responseLBS status500 [] . show)
& setBeforeMainLoop notifyServerRunning
race_
( handle onIOException $ do
traceWith tracer (APIServerStarted port)
startServer serverSettings
. simpleCors
$ websocketsOr
defaultConnectionOptions
(wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseChannel serverOutputFilter)
(httpApp tracer chain env pparams (atomically $ getLatest commitInfoP) (atomically $ getLatest snapshotUtxoP) (atomically $ getLatest pendingDepositsP) callback)
)
( do
waitForServerRunning
action $
Server
{ sendOutput = \output -> do
timedOutput <- appendToHistory history output
atomically $ do
update headStatusP output
update commitInfoP output
update snapshotUtxoP output
update headIdP output
update pendingDepositsP output
writeTChan responseChannel timedOutput
}
)
where
APIServerConfig{host, port, tlsCertPath, tlsKeyPath} = config
PersistenceIncremental{source, append} = persistence
startServer settings app =
case (tlsCertPath, tlsKeyPath) of
(Just cert, Just key) ->
runTLS (tlsSettings cert key) settings app
-- TODO: better error handling
(Just _, Nothing) ->
die "TLS certificate provided without key"
(Nothing, Just _) ->
die "TLS key provided without certificate"
_ ->
runSettings settings app
appendToHistory history output = do
time <- getCurrentTime
timedOutput <- atomically $ do
seq <- nextSequenceNumber history
let timedOutput = TimedServerOutput{output, time, seq}
modifyTVar' history (timedOutput :)
pure timedOutput
append timedOutput
pure timedOutput
onIOException ioException =
throwIO
RunServerException
{ ioException
, host
, port
}
-- | An 'IOException' with more 'IP' and 'PortNumber' added as context.
data RunServerException = RunServerException
{ ioException :: IOException
, host :: IP
, port :: PortNumber
}
deriving stock (Show)
instance Exception RunServerException
type NotifyServerRunning = IO ()
type WaitForServer = IO ()
-- | Setup notification and waiter to ensure that something only runs after the
-- server is actually listening.
setupServerNotification :: IO (NotifyServerRunning, WaitForServer)
setupServerNotification = do
mv <- newEmptyMVar
pure (putMVar mv (), takeMVar mv)