-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathreader.go
119 lines (111 loc) · 2.62 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package sse
import (
"bufio"
"bytes"
"io"
"strconv"
"strings"
)
// TODO: spec allows for a BOM at the beginning of the stream
func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
var (
eol = bytes.IndexAny(data, "\r\n")
crlf = 1
)
if eol == -1 {
if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}
if data[eol] == '\r' {
if eol+1 < len(data) {
if data[eol+1] == '\n' {
crlf = 2
}
}
}
return eol + crlf, data[0:eol], nil
}
// Reader reads events from an io.Reader.
type Reader struct {
scanner *bufio.Scanner
// LastEventID maintains the ID of the last event received. If the last
// event did not contain an ID, then the value from the previous event is
// used. If no event with an ID has been received, this will be set to the
// zero value.
LastEventID string
// ReconnectionTime indicates how long the client should wait (in MS)
// before attempting to reconnect to the server. Note that this is set to
// the zero value unless the server changes it and a sensible default
// should be selected in place of the zero value.
ReconnectionTime int
}
// NewReader creates a new Reader instance for the provided io.Reader.
func NewReader(r io.Reader) *Reader {
scanner := bufio.NewScanner(r)
scanner.Split(scanLines)
return &Reader{
scanner: scanner,
}
}
// NextEvent blocks until the next event is received, there are no more events,
// or an error occurs. No event or error will be returned if there are no more
// events.
func (r *Reader) NextEvent() (*Event, error) {
var (
eventType = defaultMessageType
eventData []string
eventID = r.LastEventID
)
for len(eventData) == 0 {
for {
if !r.scanner.Scan() {
return nil, r.scanner.Err()
}
line := r.scanner.Bytes()
if len(line) == 0 {
break
}
if line[0] == ':' {
continue
}
var (
field []byte = line
value []byte
)
if i := bytes.IndexRune(line, ':'); i != -1 {
field = line[:i]
value = line[i+1:]
if len(value) != 0 && value[0] == ' ' {
value = value[1:]
}
}
switch string(field) {
case fieldNameEvent:
eventType = string(value)
case fieldNameData:
eventData = append(eventData, string(value))
case fieldNameID:
if !bytes.Contains(value, []byte{'\x00'}) {
eventID = string(value)
r.LastEventID = eventID
}
case fieldNameRetry:
i, err := strconv.Atoi(string(value))
if err != nil {
continue
}
r.ReconnectionTime = i
}
}
}
return &Event{
Type: eventType,
Data: strings.Join(eventData, "\n"),
ID: eventID,
}, nil
}