-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsrv.go
315 lines (275 loc) · 7.91 KB
/
srv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
package cs
import (
"errors"
"fmt"
"sync"
"time"
"github.com/gogf/gf/os/gcache"
)
// Srv 基于命令的消息处理框架
type Srv struct {
Server []ServerAdapter // 服务器适配器
serverMu sync.Mutex
isRunning bool // 服务是否已经正在运行
runErr chan error // 服务运行错误通知
middleware []HandlerFunc // 全局路由中间件
pushMiddleware []PushHandlerFunc // 全局推送中间件
internalMiddleware []HandlerFunc // 内部的中间件,执行顺序在洋葱模型的最里层
routes map[string][]HandlerFunc // 路由的处理函数
state *State // SID 会话的状态数据
}
// New 指定服务器实例化一个消息服务
func New(server ...ServerAdapter) *Srv {
srv := &Srv{
Server: server,
runErr: make(chan error, 0),
routes: map[string][]HandlerFunc{},
state: &State{cache: gcache.New()},
}
// 推送前填充数据
srv.UsePush(fillPushResp)
// 内部中间件
srv.useInternal(internalPanicHandler)
return srv
}
// AddServer 增加服务适配器
func (s *Srv) AddServer(server ...ServerAdapter) *Srv {
s.serverMu.Lock()
s.Server = append(s.Server, server...)
// 如果服务已经正在 running 了,增加的时候自动启动
if s.isRunning {
for _, ser := range server {
go s.startServer(ser)
}
}
s.serverMu.Unlock()
return s
}
// SetStateExpire 设置会话的状态有效时长
func (s *Srv) SetStateExpire(t time.Duration) *Srv {
s.state.keyExpireTimeout = t
return s
}
// SetStateAdapter 设置状态管理的存储适配器,默认是存储在内存中,可设置为其他
func (s *Srv) SetStateAdapter(adapter gcache.Adapter) *Srv {
s.state.SetAdapter(adapter)
return s
}
// Use 增加全局中间件
func (s *Srv) Use(handlers ...HandlerFunc) *Srv {
s.middleware = append(s.middleware, handlers...)
return s
}
func (s *Srv) useInternal(handlers ...HandlerFunc) *Srv {
s.internalMiddleware = append(s.internalMiddleware, handlers...)
return s
}
// UsePush 增加推送中间件,该类中间件只会在使用 *Context 服务器主动推送的场景下才会被调用,如 Push, Broadcast, PushSID,在请求-响应模式时不会被调用,使用 ctx.Srv 调用也不会被触发
func (s *Srv) UsePush(handlers ...PushHandlerFunc) *Srv {
s.pushMiddleware = append(s.pushMiddleware, handlers...)
return s
}
// Group 路由分组,指定该分组下的中间件
func (s *Srv) Group(handlers ...HandlerFunc) *SrvGroup {
srv := &SrvGroup{
parent: nil,
middleware: handlers,
srv: s,
}
return srv
}
var routeMu sync.Mutex
// Handle 注册路由,cmd 是命令, handlers 是该路由的处理函数
func (s *Srv) Handle(cmd string, handlers ...HandlerFunc) *Srv {
srv := s
if len(handlers) == 0 {
return s
}
hs, ok := srv.routes[cmd]
if ok {
hs = append(hs, handlers...)
} else {
hs = handlers
}
routeMu.Lock()
srv.routes[cmd] = hs
routeMu.Unlock()
return s
}
// Push 往指定的会话 SID 连接推送消息
func (s *Srv) Push(sid string, resp *Response) error {
resp.fill()
server, err := s.getSidServer(sid)
if err != nil {
return err
}
return s.PushServer(server, sid, resp)
}
// PushServer 往指定适配器的 sid 推送消息
func (s *Srv) PushServer(server ServerAdapter, sid string, resp *Response) error {
resp.fill()
return server.Write(sid, resp)
}
// Broadcast 往所有可用的会话推送消息
func (s *Srv) Broadcast(resp *Response) {
// resp.fill()
for _, server := range s.Server {
for _, sid := range s.GetAllSID() {
server.Write(sid, resp)
}
}
}
// Close 关闭指定会话 SID 的连接
func (s *Srv) Close(sid string) error {
server, err := s.getSidServer(sid)
if err != nil {
return errors.New("the sid is already close")
}
return s.CloseWithServer(server, sid)
}
// CloseWithServer 关闭指定适配器的指定sid,该方法效率比 Close 高
func (s *Srv) CloseWithServer(server ServerAdapter, sid string) error {
return server.Close(sid)
}
// GetState 获取指定会话的指定状态值
func (s *Srv) GetState(sid, key string) interface{} {
return s.state.Get(sid, key)
}
// SetState 设置指定连接的状态
func (s *Srv) SetState(sid, key string, val interface{}) {
s.state.Set(sid, key, val)
}
// NewContext 根据请求消息实例化上下文
// 应该在实现 adapter 时才有用
func (s *Srv) NewContext(server ServerAdapter, sid string, req *Request) *Context {
ctx := &Context{
Response: &Response{
Request: req,
Cmd: req.Cmd,
Seqno: req.Seqno,
Code: -1,
Msg: msgUnsupportCmd,
Data: struct{}{},
},
SID: sid,
Srv: s,
Server: server,
}
routeHandlers, ok := s.routes[req.Cmd]
var handlers []HandlerFunc
if ok {
handlers = make([]HandlerFunc, 0, len(s.middleware)+len(routeHandlers)+len(s.internalMiddleware))
handlers = append(handlers, s.middleware...)
handlers = append(handlers, s.internalMiddleware...)
handlers = append(handlers, routeHandlers...)
ctx.OK() // 匹配到了路由,但是 handler 没有设置响应
} else {
handlers = make([]HandlerFunc, 0, len(s.middleware)+1)
handlers = append(handlers, s.middleware...)
}
ctx.handlers = handlers
ctx.handlerIndex = -1
return ctx
}
// CallContext 调用上下文,触发上下文中间件
// 应该在实现 adapter 时才有用
func (s *Srv) CallContext(ctx *Context) {
for !ctx.handlerAbort && ctx.handlerIndex < len(ctx.handlers) {
ctx.Next()
}
}
// 当有新的会话SID产生时触发,依赖内置命令 CmdConnected 实现
func (s *Srv) onSidConnected(sid string) {}
// 当有会话SID关闭时触发,依赖内置命令 CmdClosed 实现
func (s *Srv) onSidClosed(sid string) {
s.state.destroySid(sid)
}
// 接收服务器适配器产生的消息,并执行路由处理函数
func (s *Srv) startServer(server ServerAdapter) {
for {
sid, req, err := server.Read(s)
if err != nil {
s.runErr <- err
return
}
if req == nil {
s.runErr <- errors.New("unexpected request data")
return
}
// handler cmd
go func(sid string, req *Request) {
ctx := s.NewContext(server, sid, req)
s.CallContext(ctx) // 为什么会卡死在这不回复
// internal will not response
if req.Cmd != CmdConnected &&
req.Cmd != CmdClosed &&
req.Cmd != CmdHeartbeat {
s.PushServer(server, sid, ctx.Response)
}
// call internal hooks
switch req.Cmd {
case CmdConnected:
s.onSidConnected(sid)
case CmdClosed:
s.onSidClosed(sid)
}
}(sid, req)
}
}
// GetAllSID 获取所有适配器的 SID
func (s *Srv) GetAllSID() []string {
sids := []string{}
for _, server := range s.Server {
sids = append(sids, server.GetAllSID()...)
}
return sids
}
func (s *Srv) getSidServer(sid string) (ServerAdapter, error) {
for _, server := range s.Server {
for _, id := range server.GetAllSID() {
if id == sid {
return server, nil
}
}
}
return nil, errors.New("the sid is destroy")
}
func (s *Srv) callPushMiddleware(c *Context, resp *Response) (*Context, error) {
if len(s.pushMiddleware) == 0 {
return c, nil
}
ctx := c.clone()
ctx.Response.Cmd = resp.Cmd
ctx.Response.Code = resp.Code
ctx.Response.Msg = resp.Msg
ctx.Response.Data = resp.Data
ctx.Response.Seqno = randomString(12)
for _, h := range s.pushMiddleware {
if err := h(ctx); err != nil {
return nil, err
}
}
return ctx, nil
}
// Run 开始接收命令消息,运行框架,会阻塞当前 goroutine
func (s *Srv) Run() error {
mdlLen := len(s.middleware)
for cmd, hs := range s.routes {
text := ""
if len(hs) > 0 {
h := hs[len(hs)-1]
text = fmt.Sprintf("[SRV-debug] %s => %s[%d handlers]", cmd, funcName(h), len(hs)+mdlLen)
} else {
text = fmt.Sprintf("[SRV-debug] %s => [%d handlers]", cmd, mdlLen)
}
fmt.Println(text)
}
s.serverMu.Lock()
s.isRunning = true
for _, server := range s.Server {
go s.startServer(server)
}
s.serverMu.Unlock()
err := <-s.runErr
return err
}