-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpub.go
94 lines (80 loc) · 2.21 KB
/
pub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package gosaqws
import (
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
// PubStub needs to be used by the publisher to store the messages
// and serve the messages to each subscriber
type PubStub struct {
m sync.RWMutex
sessionId int
q []json.RawMessage
}
// Install the handler for the publisher at the specified path in the web-server
//
// Instead of calling Install, the `HandleSAQWS` can also be installed directly
func Install(path string, p *PubStub) {
http.HandleFunc(path, p.HandleSAQWS)
}
// NewSession starts a new session for the publisher
//
// All messages that were previously appended are deleted.
func (p *PubStub) NewSession() {
p.m.Lock()
defer p.m.Unlock()
p.sessionId++
p.q = nil
}
// Append a range of bytes to the queue of messages to be published
func (p *PubStub) Append(data json.RawMessage) {
p.m.Lock()
defer p.m.Unlock()
p.q = append(p.q, data)
}
var upgrader = websocket.Upgrader{}
func (p *PubStub) HandleSAQWS(w http.ResponseWriter, r *http.Request) {
clientConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("upgrade error:", err)
return
}
defer func() {
err := clientConn.Close()
if err != nil {
log.Println("ERROR while closing websocket:", err)
}
}()
// catch up on all that is already available in the session
p.m.RLock()
sessionId := p.sessionId
cursor, err := p.sendCurrentSessionBacklog(clientConn, 0)
p.m.RUnlock()
// start listening to updates (using a spin-lock currently, will be improved)
for true {
time.Sleep(10 * time.Millisecond)
p.m.RLock()
// check if in the mean time no new session has started, if so reset the cursor
if sessionId < p.sessionId {
cursor = 0
sessionId = p.sessionId
}
cursor, err = p.sendCurrentSessionBacklog(clientConn, cursor)
p.m.RUnlock()
}
}
func (p *PubStub) sendCurrentSessionBacklog(clientConn *websocket.Conn, cursorStart int) (cursor int, err error) {
cursor = len(p.q)
if cursorStart < cursor {
raw, err := json.Marshal(p.q[cursorStart:cursor])
if err != nil {
log.Println("ERROR:marshalling slice of q:", err)
return cursorStart, err
}
err = clientConn.WriteMessage(websocket.TextMessage, raw)
}
return cursor, err
}