-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathProcess.idr
444 lines (363 loc) · 17.4 KB
/
Process.idr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
module Process
import System.Concurrency.Raw
import public Data.List -- public, to get proof search machinery
%access public
-- Process IDs are parameterised by their interface. A request of type
-- 'iface t' will get a response of type 't'
data ProcID : (iface : Type -> Type) -> Type where
MkPID : Ptr -> ProcID iface
data ServerID : Type where
MkServer : ProcID iface -> ServerID
implicit MkServer' : ProcID iface -> ServerID
MkServer' = MkServer
data Replied = YesR | NoR
data ReqHandle = MkReqH Nat
-- Current state of a process includes:
-- * the servers it currently has an open connection to
-- * the number of clients it currently has connected
-- * whether it has responded to a request yet
-- Therefore, we can write process types which make clear that a process
-- cannot quit while it is talking to a server, or while it still has clients
-- expecting to communicate with it, or if it has not serviced any requests.
data ProcState : Type where
MkProcState : (servers : List ServerID) ->
(clients : Nat) ->
Replied ->
ProcState
data Pending : ReqHandle -> List (ReqHandle, Type) -> Type -> Type where
PendingHere : Pending h ((h, t) :: hs) t
PendingThere : Pending h hs t -> Pending h ((h', t') :: hs) t
dropPending : (hs : List (ReqHandle, Type)) -> Pending h hs ty ->
List (ReqHandle, Type)
dropPending ((h, t) :: xs) PendingHere = xs
dropPending ((h', t') :: xs) (PendingThere x)
= ((h', t') :: dropPending xs x)
data ConnectedTo : ServerID -> ProcState -> Type where
IsConnectedTo : Elem p servers ->
ConnectedTo p (MkProcState servers c reply)
data NoClient : ProcState -> Type where
IsNoClient : NoClient (MkProcState servers 0 reply)
data OneClient : ProcState -> Type where
IsOneClient : OneClient (MkProcState servers (S k) reply)
data Reply : ProcState -> Type where
IsReply : Reply (MkProcState s c YesR)
data NoReply : ProcState -> Type where
IsNoReply : NoReply (MkProcState s c NoR)
{-- Some useful operations on process state --}
newClient : ProcState -> ProcState
newClient (MkProcState servers clients r)
= MkProcState servers (S clients) r
setClients : ProcState -> Nat -> ProcState
setClients (MkProcState servers clients r) k
= MkProcState servers k r
newServer : ProcID iface -> ProcState -> ProcState
newServer p (MkProcState servers clients r)
= MkProcState (MkServer p :: servers) clients r
dropServer : (pid : ProcID iface) -> (p : ProcState) ->
ConnectedTo (MkServer pid) p -> ProcState
dropServer pid (MkProcState servers c r) (IsConnectedTo prf)
= MkProcState (dropElem servers prf) c r
{-
pendingReq : ProcID iface -> (h : ReqHandle) -> iface t ->
ProcState hs -> ProcState ((h, t) :: hs)
pendingReq {t} p h x (MkProcState s c r) = MkProcState s c r
doneReq : ProcState hs -> (p : Pending h hs) -> ProcState (dropPending hs p)
doneReq (MkProcState s c r) p = MkProcState s c r
-}
replied : ProcState -> ProcState
replied (MkProcState servers clients r)
= MkProcState servers clients YesR
resetReplied : ProcState -> ProcState
resetReplied (MkProcState servers clients r)
= MkProcState servers clients NoR
runningServer : Nat -> ProcState
runningServer c = MkProcState [] (S c) NoR
doneServer : ProcState
doneServer = MkProcState [] 0 YesR
init : List ServerID -> ProcState
init s = MkProcState s 0 NoR
{-- Processes themselves.
A process returns some type 'a', responds to requests on the interface
'iface', and has an input and output state.
--}
mutual
data Process : (a : Type) -> (iface : Type -> Type) ->
List (ReqHandle, Type) -> (a -> List (ReqHandle, Type)) ->
ProcState -> (a -> ProcState) ->
Type where
-- Some plumbing
Lift' : IO a -> Process a iface hs (const hs) p (const p)
Pure : a -> Process a iface hs (const hs) p (const p)
Quit : a -> Process a iface hs (const hs) p (const (resetReplied p))
bind : Process a iface hs hs' p p' ->
((x : a) -> Process b iface (hs' x) hs'' (p' x) p'') ->
Process b iface hs hs'' p p''
Fork : Process () serveri [] (const []) (runningServer 1) (const doneServer) ->
Process (ProcID serveri) iface hs (const hs) p (\res => (newServer res p))
Work : (worker : (pid : ProcID iface) -> Worker [pid] ()) ->
(waiter : Process t iface hs (const hs) (runningServer 1) (const doneServer)) ->
Process t iface hs (const hs) p (const p)
Request : (r : ProcID serveri) -> (x : serveri ty) ->
{auto connected : ConnectedTo (MkServer r) p} ->
Process ReqHandle iface
hs (\h => (h, ty) :: hs)
p (const p)
GetReply : (h : ReqHandle) ->
{auto pending : Pending h hs ty} ->
Process ty iface
hs (const (dropPending hs pending))
p (const p)
TimeoutRespond : (timeout : Int) ->
(def : res) ->
({t : Type} -> (x : iface t) ->
Response (t, res) iface hs p) ->
Process res iface hs (const hs) p (const (replied p))
Respond : ({t : Type} -> (x : iface t) ->
Response (t, res) iface hs p) ->
Process res iface hs (const hs) p (const (replied p))
Connect : (r : ProcID serveri) ->
Process Bool iface
hs (const hs)
p (\ok => case ok of
True => newServer r p
False => p)
Disconnect : (r : ProcID serveri) ->
{auto connected : ConnectedTo (MkServer r) p} ->
Process () iface
hs (const hs)
p (const (dropServer r p connected))
CountClients : Process Nat iface hs (const hs) p (\n => setClients p n)
-- FIXME: The process had better be guaranteed to change the system
-- state (e.g. finish with a YesR since it starts with a NoR) because
-- then it can't be used in a Respond, so responding can't loop.
Loop : {auto isreply : Reply p} ->
Inf (Process t iface hs hs' (resetReplied p) p') ->
Process t iface hs hs' p p'
-- 'Running a iface' is the type of a process which is currently
-- responding to requests (i.e. knows it has at least one client connected)
-- and will not exit unless there are no clients connected
Running : Type -> (iface : Type -> Type) -> Type
Running a iface = {k : Nat} -> Process a iface [] (const []) (runningServer k) (const doneServer)
Response : Type -> (iface : Type -> Type) -> List (ReqHandle, Type) ->
ProcState -> Type
Response a iface hs p = Process a iface hs (const hs) p (const p)
-- 'Program a' is the type of a process which does not respond to any requests
-- and begins and ends with no connections to any server open.
Program : Type -> (iface : Type -> Type) -> Type
Program a iface = Process a iface [] (const []) (init []) (const (init []))
-- 'Connected s a' is the type of a process which does not respond to any
-- requests and begins and ends with connections to a given server list.
Connected : List ServerID -> Type -> Type
Connected s a = Process a (const Void) [] (const []) (init s) (const (init s))
-- 'Worker s a' is the type of a process which does not respond to any
-- requests, and begins with a connection to a server it is to send a
-- notification to.
Worker : List ServerID -> Type -> Type
Worker s a = Process a (const Void) [] (const []) (init s) (const (init []))
implicit
Lift : IO a -> Process a iface hs (const hs) p (const p)
Lift = Lift'
%no_implicit -- helps error messages, and speeds things up a bit
%inline -- so that the productivity checker treats 'bind' as a constructor!
(>>=) : Process a iface hs hs' p p' ->
((x : a) -> Process b iface (hs' x) hs'' (p' x) p'') ->
Process b iface hs hs'' p p''
(>>=) = bind
TrySend : (proc : ProcID iface) -> iface ty ->
Process (Maybe ty) iface'
hs (const hs)
(MkProcState s c r) (const (MkProcState s c r))
TrySend pid req = do True <- Connect pid | False => Pure Nothing
h <- Request pid req
resp <- GetReply h
Disconnect pid
Pure (Just resp)
Send : (proc : ProcID iface) -> iface ty ->
{auto prf : Elem (MkServer proc) s} ->
Process ty iface'
hs (const hs)
(MkProcState s c r) (const (MkProcState s c r))
Send pid req = do h <- Request pid req
GetReply h
{--- evaluator --}
-- The evaluator keeps track of the number of client connections open,
-- and manages Connect/Disconnect requests by managing them whenever a
-- 'Respond' or 'TimeoutRespond' is encountered.
data MessageQ : (Type -> Type) -> Type where
ConnectMsg : MessageQ iface
CloseMsg : MessageQ iface
RequestMsg : Nat -> iface t -> MessageQ iface
data MessageR : Type -> Type where
ReplyMsg : Nat -> (ans : ty) -> MessageR ty
data Message : (Type -> Type) -> List (ReqHandle, Type) -> Type where
MsgQuery : MessageQ iface -> Message iface hs
MsgReply : (reply : MessageR ty) -> Message iface hs
readMsg : IO (Maybe (Ptr, Message iface hs))
readMsg {iface} {hs} =
do if !checkMsgs
then do msg <- getMsgWithSender {a = Message iface hs}
pure (Just msg)
else pure Nothing
readMsgTimeout : Int -> IO (Maybe (Ptr, Message iface hs))
readMsgTimeout {iface} {hs} i =
do if !(checkMsgsTimeout i)
then do msg <- getMsgWithSender {a = Message iface hs}
pure (Just msg)
else pure Nothing
data RespEnv : List (ReqHandle, Type) -> Type where
Nil : RespEnv []
(::) : Maybe ty -> RespEnv hs -> RespEnv ((h, ty) :: hs)
record EvalState (iface : Type -> Type) (hs : List (ReqHandle, Type)) where
constructor MkEvalState
queue : List (Ptr, MessageQ iface)
reply : RespEnv hs
clients : Nat
nexthandle : Nat
lookup : Pending h hs ty -> RespEnv hs -> Maybe ty
lookup PendingHere (x :: xs) = x
lookup (PendingThere p) (x :: xs) = lookup p xs
dropResp : (pending : Pending h hs ty) ->
RespEnv hs -> RespEnv (dropPending hs pending)
dropResp PendingHere (x :: xs) = xs
dropResp (PendingThere p) (x :: xs) = x :: dropResp p xs
updateReplies : Pending h hs ty -> ty -> RespEnv hs -> RespEnv hs
updateReplies PendingHere msg (x :: xs) = Just msg :: xs
updateReplies (PendingThere p) msg (x :: xs) = x :: updateReplies p msg xs
total
findPending : (h : ReqHandle) -> RespEnv hs -> Maybe (ty ** Pending h hs ty)
findPending {hs = []} k [] = Nothing
findPending {hs = ((MkReqH h, t) :: hs)} (MkReqH k) (x :: xs) with (decEq h k)
findPending {hs = ((MkReqH h, t) :: hs)} (MkReqH h) (x :: xs) | (Yes Refl)
= Just (t ** PendingHere)
findPending {hs = ((MkReqH h, t) :: hs)} (MkReqH k) (x :: xs) | (No contra)
= do (ty' ** p') <- findPending (MkReqH k) xs
Just (ty' ** PendingThere p')
covering
updateQueue : EvalState iface hs -> IO (EvalState iface hs)
updateQueue {iface} {hs} st
= case !(readMsg {iface} {hs}) of
Nothing => pure st
Just (pid, MsgQuery msg) =>
updateQueue (record { queue = queue st ++ [(pid, msg)] } st)
Just (pid, MsgReply (ReplyMsg rq ans)) =>
case findPending (MkReqH rq) (reply st) of
Nothing => updateQueue {iface} {hs} st -- can't happen!
Just (ty ** pend) =>
updateQueue (record { reply = updateReplies pend (believe_me ans) (reply st) } st)
-- Keep updating the queue with incoming messages until either we get a
-- RequestMsg, or we reach a timeout.
covering
updateQueueTimeout : Int -> EvalState iface hs -> IO (EvalState iface hs)
updateQueueTimeout {iface} {hs} i st
= case !(readMsgTimeout {iface} {hs} i) of
Nothing => pure st
Just (pid, MsgQuery (RequestMsg rq msg)) =>
pure (record { queue = queue st ++ [(pid, RequestMsg rq msg)] } st)
Just (pid, MsgQuery msg) => do
-- TODO: Why not update client count here too?
updateQueueTimeout i (record { queue = queue st ++ [(pid, msg)] } st)
Just (pid, MsgReply (ReplyMsg rq ans)) =>
case findPending (MkReqH rq) (reply st) of
Nothing => updateQueueTimeout i {iface} {hs} st -- can't happen!
Just (ty ** pend) =>
updateQueueTimeout i (record { reply = updateReplies pend (believe_me ans) (reply st) } st)
total
removeReq : List (Ptr, MessageQ iface) -> List (Ptr, MessageQ iface) ->
Maybe (Ptr, (ty ** (Nat, iface ty)), List (Ptr, MessageQ iface))
removeReq acc [] = Nothing
removeReq acc ((pid, RequestMsg rq m) :: xs) = Just (pid, (_ ** (rq, m)), reverse acc ++ xs)
removeReq acc (x :: xs) = removeReq (x :: acc) xs
total
removeConn : Nat ->
List (Ptr, MessageQ iface) -> List (Ptr, MessageQ iface) ->
(Nat, List (Ptr, MessageQ iface))
removeConn cl acc [] = (cl, reverse acc)
removeConn cl acc ((pid, ConnectMsg) :: xs)
= removeConn (cl + 1) acc xs
removeConn cl acc ((pid, CloseMsg) :: xs)
= removeConn (cl - 1) acc xs
removeConn cl acc (x :: xs) = removeConn cl (x :: acc) xs
-- Remove the first thing in the event list which is a request, if it
-- exists.
getRequest : EvalState iface hs -> Maybe (Ptr, (ty ** (Nat, iface ty)), EvalState iface hs)
getRequest (MkEvalState queue reply clients nh)
= do (pid, req, queue') <- removeReq [] queue
return (pid, req, MkEvalState queue' reply clients nh)
countClients : EvalState iface hs -> (Nat, EvalState iface hs)
countClients (MkEvalState queue reply clients nh)
= let (clients', queue') = removeConn clients [] queue in
(clients', MkEvalState queue' reply clients' nh)
-- sendResponse : Ptr ->
-- (ty -> EvalState iface hs -> IO a) ->
-- (x, ty) -> EvalState iface hs ->
-- IO a
-- sendResponse {iface} {hs} pid k (resp, val) st = do
-- sendToThread pid (MsgReply {hs} {iface} ?prf (ReplyMsg resp))
-- k val st
covering
eval : EvalState iface hs ->
{p : ProcState} -> {p' : ty -> ProcState} ->
Process ty iface hs hs' p p' ->
((res : ty) -> EvalState iface (hs' res) -> IO a) -> IO a
eval st (Lift' x) k = do x' <- x
k x' st
eval st (Pure x) k = k x st
eval st (Quit x) k = k x st
eval st (bind x f) k = eval st x (\x', st' => eval st' (f x') k)
eval st (Fork proc) k
= do ptr <- fork (eval (MkEvalState [] [] 1 0) proc (\_, _ => pure ()))
k (MkPID ptr) st
eval st (Work proc cont) k
= do ptr <- fork (eval (MkEvalState [] [] 0 0) (proc (MkPID prim__vm))
(\_, _ => pure ()))
eval (record { clients = clients st + 1 } st) cont k
eval {hs} (MkEvalState q reqs c nh) (Request (MkPID pid) x) k
= do sendToThread pid (MsgQuery {hs} (RequestMsg nh x))
k (MkReqH nh) (MkEvalState q (Nothing :: reqs) c (S nh))
eval {p} st (GetReply {pending} h) k
= do -- Need to keep receiving messages until there's a response
-- available
MkEvalState q reqs c nh <- updateQueue st
case lookup pending reqs of
Nothing => do checkMsgsTimeout 1
eval {p} (MkEvalState q reqs c nh) (GetReply h) k
Just reply =>
k reply (MkEvalState q (dropResp pending reqs) c nh)
eval {iface} {hs} st (TimeoutRespond timeout def f) k
= do st' <- updateQueueTimeout timeout st
case getRequest st' of
Nothing => k def st'
Just (pid, (_ ** (rq, req)), st'') => do
eval st'' (f req) -- (sendResponse pid k) -- ?foo
-- this weirdness works around an erasure bug
-- which causes a seg fault...
(\ r, st''' =>
case r of
(resp, val) => do
sendToThread pid (MsgReply {iface} {hs} (ReplyMsg rq resp))
k val st''')
eval {iface} {hs} st (Respond f) k
= do st' <- updateQueue st
case getRequest st' of
Nothing => eval {iface} st' (Respond f) k
Just (pid, (_ ** (rq, req)), st'') => do
eval st'' (f req) (\ (resp, val), st''' => do
sendToThread pid (MsgReply {iface} {hs} (ReplyMsg rq resp))
k val st''')
eval {hs} st (Connect {serveri} (MkPID pid)) k
= if pid == prim__vm then k False st else do
v <- sendToThread pid (MsgQuery {iface=serveri} {hs}
ConnectMsg)
-- TODO: Wait for ACK
k (v == 1) st
eval {hs} st (Disconnect {serveri} (MkPID pid)) k
= do v <- sendToThread pid (MsgQuery {iface=serveri} {hs}
CloseMsg)
k () st
eval st CountClients k
= do st' <- updateQueue st
let (cl, st'') = countClients st'
k cl st''
eval st (Loop x) k = eval st x k
run : Program a iface -> IO a
run p = eval (MkEvalState [] [] 0 0) p (\res, t => pure res)