-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhandler.go
124 lines (115 loc) · 3.45 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package gtcp
import (
"context"
"io"
"runtime"
"time"
)
type (
// WriteFlusher is the interface that wraps write operations on Conn.
WriteFlusher interface {
io.Writer
// Flush writes any buffered data to the underlying Conn.
Flush() error
}
// ConnHandler is the callback function called when Conn gets ready to communicate with peer.
// You can use ConnHandler to gain full control on socket.
ConnHandler func(context.Context, Conn)
// ReqHandler is the callback function used with SetKeepAliveHandler.
// It's called when Conn gets ready to communicate specifically
// - accepted by listener
// - receiving one more byte while being in keepalive
// You can use ReqHandler with SetKeepAliveHandler to implement keepalive easily.
ReqHandler func(Conn) error
// PipelineReader is the callback function used with SetPipelineHandler.
// SetPipelineHandler enables to implement protocol pipelining easily.
// It's used for reading part of pipelining and dispatch meaningful []byte to
// PipelineWriter via return value.
PipelineReader func(io.Reader) ([]byte, error)
// PipelineWriter is the callback function used with SetPipelineHandler.
// SetPipelineHandler enables to implement protocol pipelining easily.
// It's used for writing part of pipelining and
// called when receiving a meaningful []byte from PipelineReader.
PipelineWriter func([]byte, WriteFlusher) error
)
// SetKeepAliveHandler enables to implement keepalive easily.
// It call h and call again repeatedly if receiving one more byte while waiting idle time
// or stop communicating.
// It also stop when detecting listener got closed.
func (s *Server) SetKeepAliveHandler(idle time.Duration, h ReqHandler) {
s.ConnHandler = func(ctx context.Context, conn Conn) {
for {
err := h(conn)
if err != nil {
// don't reuse if some error happened
return
}
select {
case <-ctx.Done():
// canceled by parent
return
default:
}
conn.SetIdle(true)
conn.SetReadDeadline(time.Now().Add(idle))
if _, err = conn.Peek(1); err != nil {
return
}
conn.SetIdle(false)
conn.SetReadDeadline(time.Time{})
}
}
}
// SetPipelineHandler enables to implement protocol pipelining easily.
// It combines pr and pw with a buffered channel that has numBuf.
// pr need to implement reading part of pipelining and dispatch meaningful []byte to pw.
// pw need to implement writing part of pipelining.
// It stops if pr returns nil buf or any error.
// It also stop when detecting listener got closed.
func (s *Server) SetPipelineHandler(
numBuf int,
pr PipelineReader,
pw PipelineWriter) {
s.ConnHandler = func(ctx context.Context, conn Conn) {
packet := make(chan []byte, numBuf)
go func() {
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
s.Logger.Errorf("gtcp: panic serving %v: %v\n%s", conn.RemoteAddr(), err, buf)
}
close(packet)
}()
for {
// reader
buf, err := pr(conn)
if buf == nil || err != nil {
return
}
select {
case packet <- buf:
case <-ctx.Done():
// canceled by parent
return
}
}
}()
// writer
for {
select {
case buf := <-packet:
if buf == nil {
// context canceled or tcp session closed or error happened at reader
return
}
err := pw(buf, conn)
if err != nil {
// continue until reader failed
continue
}
}
}
}
}