forked from elm-lang/websocket
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWebSocket.elm
370 lines (256 loc) · 8.61 KB
/
WebSocket.elm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
effect module WebSocket where { command = MyCmd, subscription = MySub } exposing
( send
, listen
, onOpen
, onClose
)
{-| Web sockets make it cheaper to talk to your servers.
Connecting to a server takes some time, so with web sockets, you make that
connection once and then keep using. The major benefits of this are:
1. It faster to send messages. No need to do a bunch of work for every single
message.
2. The server can push messages to you. With normal HTTP you would have to
keep *asking* for changes, but a web socket, the server can talk to you
whenever it wants. This means there is less unnecessary network traffic.
The API here attempts to cover the typical usage scenarios, but if you need
many unique connections to the same endpoint, you need a different library.
# Web Sockets
@docs listen, onOpen, onClose, send
-}
import Dict
import Process
import Task exposing (Task)
import WebSocket.LowLevel as WS
-- COMMANDS
type MyCmd msg
= Send String String
{-| Send a message to a particular address. You might say something like this:
send "ws://echo.websocket.org" "Hello!"
**Note:** It is important that you are also subscribed to this address with
`listen`. If you are not, the web socket will be created to send one message
and then closed. Not good!
-}
send : String -> String -> Cmd msg
send url message =
command (Send url message)
cmdMap : (a -> b) -> MyCmd a -> MyCmd b
cmdMap _ (Send url msg) =
Send url msg
-- SUBSCRIPTIONS
type MySub msg
= MySub String String (String -> msg)
{-| Subscribe to any incoming messages on a websocket. You might say something
like this:
type Msg = Echo String | ...
subscriptions model =
listen "ws://echo.websocket.org" Echo
**Note:** If the connection goes down, the effect manager tries to reconnect
with an exponential backoff strategy.
-}
listen : String -> (String -> msg) -> Sub msg
listen url tagger =
subscription (MySub "listen" url tagger)
{-| Subscribe to websocket open events. You might say something
like this:
type Msg = WsOpened String | ...
subscriptions model =
onOpen WsOpened
-}
onOpen : (String -> msg) -> Sub msg
onOpen tagger =
subscription (MySub "onOpen" "" tagger)
{-| Subscribe to websocket close events. You might say something
like this:
type Msg = WsClosed String | ...
subscriptions model =
onClose WsClosed
-}
onClose : (String -> msg) -> Sub msg
onClose tagger =
subscription (MySub "onClose" "" tagger)
subMap : (a -> b) -> MySub a -> MySub b
subMap func sub =
case sub of
MySub category url tagger ->
MySub category url (tagger >> func)
-- MANAGER
type alias State msg =
{ sockets : SocketsDict
, subs : SubsDict msg
}
type alias SocketsDict =
Dict.Dict String Connection
type alias SubsDict msg =
Dict.Dict String (Dict.Dict String (String -> msg))
type Connection
= Opening Int Process.Id
| Connected WS.WebSocket
init : Task Never (State msg)
init =
Task.succeed (State Dict.empty Dict.empty)
-- HANDLE APP MESSAGES
(&>) t1 t2 =
Task.andThen (\_ -> t2) t1
onEffects
: Platform.Router msg Msg
-> List (MyCmd msg)
-> List (MySub msg)
-> State msg
-> Task Never (State msg)
onEffects router cmds subs state =
let
newSubs =
buildSubDict subs Dict.empty
newEntries =
buildEntriesDict subs Dict.empty
leftStep category _ getNewSockets =
getNewSockets
|> Task.andThen (\newSockets -> attemptOpen router 0 category
|> Task.andThen (\pid -> Task.succeed (Dict.insert category (Opening 0 pid) newSockets)))
bothStep category _ connection getNewSockets =
Task.map (Dict.insert category connection) getNewSockets
rightStep category connection getNewSockets =
closeConnection connection &> getNewSockets
collectNewSockets =
Dict.merge leftStep bothStep rightStep newEntries state.sockets (Task.succeed Dict.empty)
in
cmdHelp router cmds state.sockets
&> collectNewSockets
|> Task.andThen (\newSockets -> Task.succeed (State newSockets newSubs))
cmdHelp : Platform.Router msg Msg -> List (MyCmd msg) -> SocketsDict -> Task Never SocketsDict
cmdHelp router cmds socketsDict =
case cmds of
[] ->
Task.succeed socketsDict
Send name msg :: rest ->
case Dict.get name socketsDict of
Just (Connected socket) ->
WS.send socket msg
&> cmdHelp router rest socketsDict
_ ->
-- TODO: Since messages are no longer queued, this probably shouldn't just succeed
Task.succeed socketsDict
buildSubDict : List (MySub msg) -> SubsDict msg -> SubsDict msg
buildSubDict subs dict =
case subs of
[] ->
dict
MySub category name tagger :: rest ->
buildSubDict rest (Dict.update category (set (name, tagger)) dict)
buildEntriesDict : List (MySub msg) -> Dict.Dict String (List a) -> Dict.Dict String (List a)
buildEntriesDict subs dict =
case subs of
[] ->
dict
MySub category name tagger :: rest ->
case category of
"listen" ->
buildEntriesDict rest (Dict.update name (Just << Maybe.withDefault []) dict)
_ ->
buildEntriesDict rest dict
set : (comparable, b) -> Maybe (Dict.Dict comparable b) -> Maybe (Dict.Dict comparable b)
set value maybeDict =
case maybeDict of
Nothing ->
Just (Dict.fromList [value])
Just list ->
Just (Dict.fromList [value])
-- HANDLE SELF MESSAGES
type Msg
= Receive String String
| Die String
| GoodOpen String WS.WebSocket
| BadOpen String
onSelfMsg : Platform.Router msg Msg -> Msg -> State msg -> Task Never (State msg)
onSelfMsg router selfMsg state =
case selfMsg of
Receive name str ->
let
sends =
Dict.get "listen" state.subs
|> Maybe.withDefault Dict.empty
|> Dict.toList
|> List.map (\(_, tagger) -> Platform.sendToApp router (tagger str))
in
Task.sequence sends &> Task.succeed state
Die name ->
case Dict.get name state.sockets of
Nothing ->
Task.succeed state
Just (Connected _) ->
let
sends =
Dict.get "onClose" state.subs
|> Maybe.withDefault Dict.empty
|> Dict.toList
|> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name))
in
Task.sequence sends
&> attemptOpen router 0 name
|> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening 0 pid) state))
Just (Opening n _) ->
retryConnection router n name state
GoodOpen name socket ->
let
sends =
Dict.get "onOpen" state.subs
|> Maybe.withDefault Dict.empty
|> Dict.toList
|> List.map (\(_, tagger) -> Platform.sendToApp router (tagger name))
in
Task.sequence sends
&> Task.succeed (updateSocket name (Connected socket) state)
BadOpen name ->
case Dict.get name state.sockets of
Nothing ->
Task.succeed state
Just (Opening n _) ->
retryConnection router n name state
Just (Connected _) ->
Task.succeed state
retryConnection
: Platform.Router msg Msg
-> Int
-> String
-> State msg
-> Task x (State msg)
retryConnection router n name state =
attemptOpen router (n + 1) name
|> Task.andThen (\pid -> Task.succeed (updateSocket name (Opening (n + 1) pid) state))
updateSocket : String -> Connection -> State msg -> State msg
updateSocket name connection state =
{ state | sockets = Dict.insert name connection state.sockets }
-- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF
attemptOpen : Platform.Router msg Msg -> Int -> String -> Task x Process.Id
attemptOpen router backoff name =
let
goodOpen ws =
Platform.sendToSelf router (GoodOpen name ws)
badOpen _ =
Platform.sendToSelf router (BadOpen name)
actuallyAttemptOpen =
open name router
|> Task.andThen goodOpen
|> Task.onError badOpen
in
Process.spawn (after backoff &> actuallyAttemptOpen)
open : String -> Platform.Router msg Msg -> Task WS.BadOpen WS.WebSocket
open name router =
WS.open name
{ onMessage = \_ msg -> Platform.sendToSelf router (Receive name msg)
, onClose = \details -> Platform.sendToSelf router (Die name)
}
after : Int -> Task x ()
after backoff =
if backoff < 1 then
Task.succeed ()
else
Process.sleep (toFloat (10 * 2 ^ backoff))
-- CLOSE CONNECTIONS
closeConnection : Connection -> Task x ()
closeConnection connection =
case connection of
Opening _ pid ->
Process.kill pid
Connected socket ->
WS.close socket