-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathHeadNode.hs
238 lines (226 loc) · 8.23 KB
/
HeadNode.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE BangPatterns #-}
module HydraSim.HeadNode
( HeadNode,
newNode,
connectNodes,
startNode,
traceState
) where
import Control.Monad (forever, void)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (myThreadId, labelThread)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer
import Control.Tracer
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import HydraSim.Channel
import HydraSim.DelayedComp
import HydraSim.MSig.Mock
import HydraSim.Multiplexer
import HydraSim.Sized
import HydraSim.Trace
import HydraSim.Tx.Class
import HydraSim.Types
-- | A node in the head protocol.
data Tx tx => HeadNode m tx = HeadNode {
-- | Static configuration
hnConf :: NodeConf tx,
-- | Current local state
hnState :: TMVar m (HState tx),
-- | A 'Multiplexer' to handle communication with other nodes.
hnMultiplexer :: Multiplexer m (HeadProtocol tx)
}
-- | Set up a new node to participate in the head protocol.
newNode
:: (MonadSTM m, Tx tx)
=> NodeConf tx
-> (Size -> DiffTime)
-- ^ Write capacity of the node's network device.
--
-- Determines how much time it takes to serialise and send a message of a
-- given size.
-> (Size -> DiffTime)
-- ^ Read capacity of the node's network device.
-> m (HeadNode m tx)
newNode conf writeCapacity readCapacity = do
state <- newTMVarM $ hStateEmpty (hcNodeId conf)
multiplexer <- newMultiplexer
(show . hcNodeId $ conf)
1000 1000
-- TODO: make buffer sizes configurable. The actual numbers
-- don't really matter, but we do not want to be bounded by
-- this.
writeCapacity readCapacity
return $ HeadNode {
hnConf = conf,
hnState = state,
hnMultiplexer = multiplexer
}
-- | Connect two nodes.
connectNodes
:: forall m tx . (MonadAsync m, MonadTimer m,
Tx tx)
=> m (Channel m (MessageEdge (HeadProtocol tx)),
Channel m (MessageEdge (HeadProtocol tx)))
-> HeadNode m tx
-> HeadNode m tx
-> m ()
connectNodes createChannels node node' = do
connect createChannels
(hcNodeId (hnConf node), hnMultiplexer node)
(hcNodeId (hnConf node'), hnMultiplexer node')
addPeer node (hcNodeId $ hnConf node')
addPeer node' (hcNodeId $ hnConf node)
where
addPeer :: HeadNode m tx -> NodeId -> m ()
addPeer hn (NodeId i) = atomically $ do
state <- takeTMVar (hnState hn)
putTMVar (hnState hn) $!
state { hsVKs = Set.insert (VKey i) $ hsVKs state }
-- | Start a node.
--
-- This starts the multiplexer, the event loop handling messages, and threads
-- for sending transactions and making snapshots, according to the strategies
-- specified in the node config.
startNode
:: (MonadSTM m, MonadTimer m, MonadAsync m, MonadThrow m,
Tx tx)
=> Tracer m (TraceHydraEvent tx)
-> HeadNode m tx -> m ()
startNode tracer hn = void $
concurrently (labelThisThread nodeLabel >> listener tracer hn) $
concurrently (startMultiplexer mpTracer (hnMultiplexer hn)) $
concurrently (labelThisThread nodeLabel >> txSender tracer hn)
(labelThisThread nodeLabel >> snDaemon tracer hn)
where
mpTracer = contramap HydraMessage tracer
nodeLabel = show . hcNodeId . hnConf $ hn
labelThisThread :: MonadAsync m => String -> m ()
labelThisThread nodeLabel = do
myId <- myThreadId
labelThread myId nodeLabel
-- | write the current state to the trace
traceState
:: (MonadSTM m, Tx tx)
=> Tracer m (TraceHydraEvent tx) -> HeadNode m tx -> m ()
traceState tracer hn = do
s <- atomically $ readTMVar (hnState hn)
traceWith tracer $ HydraState s
-- | Add a message from the client (as opposed to from a node) to the message queue.
--
-- This is used for triggering events like transaction submission or snapshot
-- creation.
clientMessage
:: (MonadSTM m, Tx tx)
=> Tracer m (TraceHydraEvent tx)
-> HeadNode m tx
-> HeadProtocol tx
-> m ()
clientMessage tracer hn = sendToSelf mpTracer (hnMultiplexer hn) (hcNodeId (hnConf hn))
where
mpTracer = contramap HydraMessage tracer
-- | This is for the actual logic of the node, processing incoming messages.
listener
:: forall m tx .
(MonadSTM m, MonadTimer m, MonadAsync m,
Tx tx)
=> Tracer m (TraceHydraEvent tx)
-> HeadNode m tx -> m ()
listener tracer hn = forever $
atomically (getMessage mplex) >>= uncurry applyMessage
where
mplex = hnMultiplexer hn
mpTracer = contramap HydraMessage tracer
protocolTracer = contramap HydraProtocol tracer
hydraDebugTracer = contramap HydraDebug tracer
nodeConf = hnConf hn
thisId = hcNodeId nodeConf
applyMessage :: NodeId -> HeadProtocol tx -> m ()
applyMessage peer ms = do
traceWith hydraDebugTracer ("applyMessage " ++ show peer
++ " " ++ show ms)
state <- atomically $ takeTMVar (hnState hn)
traceWith hydraDebugTracer (" state = " ++ show state)
case hcProtocolHandler nodeConf nodeConf peer state ms of
DecApply stateUpdate trace ms' -> do
-- 'runComp' advances the time by the amount the handler takes,
-- and unwraps the result
!state' <- runComp stateUpdate
atomically $ putTMVar (hnState hn) state'
traceWith hydraDebugTracer (" state' = " ++ show state')
traceWith protocolTracer trace
-- We refine the protocol specification, in spawning a new thread for
-- computing and sending follow-up messages. Since we not acdo cess
-- the local state there, this is safe to do, and an obvious
-- performance optimisation.
void . async $ do
labelThisThread $ show thisId
runComp ms' >>= sendMessage
DecWait comp -> do
runComp comp
atomically $ putTMVar (hnState hn) state
reenqueue mpTracer mplex (peer, ms)
DecInvalid comp errmsg -> do
runComp comp
traceWith protocolTracer (TPInvalidTransition errmsg)
atomically $ putTMVar (hnState hn) state
sendMessage :: SendMessage tx -> m ()
sendMessage SendNothing = return ()
sendMessage (SendTo peer ms)
-- messages to the same node are just added to the inbox directly, without
-- going over the network
| peer == thisId = sendToSelf mpTracer mplex thisId ms
| otherwise = sendTo mplex peer ms
sendMessage (Multicast ms) =
multicast mpTracer mplex thisId ms
txSender
:: (MonadAsync m, Tx tx)
=> Tracer m (TraceHydraEvent tx)
-> HeadNode m tx -> m ()
txSender tracer hn = case hcTxSendStrategy (hnConf hn) of
SendNoTx -> return ()
SendSingleTx tx -> clientMessage tracer hn (New tx)
SendTxsDumb txs -> mapM_ (clientMessage tracer hn . New) txs
SendTxs limit txs ->
let go [] = return ()
go (tx:rest) = do
atomically $ do
s <- takeTMVar (hnState hn)
if Set.size (hsTxsInflight s) < limit
then putTMVar (hnState hn) s { hsTxsInflight = txRef tx `Set.insert` hsTxsInflight s }
else do
putTMVar (hnState hn) s
retry
clientMessage tracer hn (New tx)
go rest
in go txs
snDaemon
:: forall m tx .
(MonadSTM m, MonadAsync m, Tx tx)
=> Tracer m (TraceHydraEvent tx)
-> HeadNode m tx -> m ()
snDaemon tracer hn = case hcSnapshotStrategy conf of
NoSnapshots -> return ()
SnapAfter n ->
let
waitForOurTurn :: SnapN -> STM m SnapN
waitForOurTurn lastSn = do
s <- readTMVar (hnState hn)
let snapN = hsSnapNConf s
if Map.size (hsTxsConf s) >= n
&& hcLeaderFun conf (nextSn snapN) == hcNodeId conf
-- to prevent filling our inbox with duplicate NewSn messages:
&& snapN >= lastSn
then return $ nextSn snapN
else retry
doSnapshot :: SnapN -> m ()
doSnapshot lastSn = do
lastSn' <- atomically (waitForOurTurn lastSn)
clientMessage tracer hn NewSn
doSnapshot lastSn'
in doSnapshot noSnapN
where conf = hnConf hn