diff --git a/client.go b/client.go index dd15acb..f04b7f1 100644 --- a/client.go +++ b/client.go @@ -12,9 +12,11 @@ type Client struct { session *Session autoRetryEnabled bool sta *STAService + handler IHandler } func (c *Client) OnOpen(s *Session) { + c.session.responseTime = time.Now().Unix() c.handler.OnOpen(s) } @@ -23,6 +25,7 @@ func (c *Client) OnClose(s *Session, force bool) { // reconnect if !force && c.autoRetryEnabled { + log.Println("reconnecting ...") c.Stop() c.Start() } @@ -38,11 +41,30 @@ func (c *Client) OnPush(s *Session, data []byte) int16 { // keep alive func (c *Client) keepAlive() { - if c.session.elapsedSinceLastResponse() > 60 { - c.session.Close(false) - } else if c.session.elapsedSinceLastResponse() > 20 { - c.session.Ping() + defer Recover() + + log.Println("Keep alive running.") + + d := time.Second * 5 + t := time.NewTimer(d) + + stop := false + for !stop { + select { + case <-t.C: + t.Reset(d) + if c.session.elapsedSinceLastResponse() > 30 { + c.session.Close(false) + } else if c.session.elapsedSinceLastResponse() > 10 { + c.session.Ping() + } + + case <-c.keepAliveSignal: + stop = true + } } + + log.Println("Keep alive stopped.") } // NewClient new tcp client @@ -50,8 +72,9 @@ func NewClient(host string, h IHandler, autoRetry bool) *Client { ret := &Client{ autoRetryEnabled: autoRetry, session: nil, + handler: h, } - ret.Node = newNode(host, h, 10) + ret.Node = newNode(host, ret) return ret } @@ -81,8 +104,8 @@ func (c *Client) Start() { break } - // io counter - go c.ioCounter() + // base start + c.Node.Start() // make session c.session = newSession(0, conn, &c.Node) @@ -98,11 +121,12 @@ func (c *Client) Start() { // Stop client shutdown func (c *Client) Stop() { + c.Node.Stop() + if c.session != nil { c.session.Close(true) c.session = nil } - c.Node.Stop() log.Println("client stopped.") } diff --git a/enum.go b/enum.go index 45558ec..bed2784 100644 --- a/enum.go +++ b/enum.go @@ -4,8 +4,8 @@ package gomsg type NetError int16 const ( - Success NetError = 0 - ExceptionCatched = iota + -100 + Success NetError = iota + ExceptionCatched Write Read RequestDataIsEmpty diff --git a/logger.go b/logger.go index aaf80dd..31f7ce1 100644 --- a/logger.go +++ b/logger.go @@ -65,9 +65,9 @@ func CloseLog() { // Recover recover tool function func Recover() { if e := recover(); e != nil { - log.Println(debug.Stack()) + log.Printf("%s=>%s\n", e, debug.Stack()) if Logger != nil { - Logger.Println(debug.Stack()) + Logger.Printf("%s=>%s\n", e, debug.Stack()) } } } diff --git a/node.go b/node.go index b0297bc..a1f9c25 100644 --- a/node.go +++ b/node.go @@ -9,7 +9,7 @@ import ( type Callback func(*Result) // internal with keep alive handling -type iInternalHandler interface { +type iinternal interface { IHandler keepAlive() } @@ -24,54 +24,47 @@ type IHandler interface { // Node struct type Node struct { - addr string - handler IHandler - signal chan int - ReadCounter chan int - WriteCounter chan int - keepAliveInterval int + addr string + internalHandler iinternal + signal chan int + keepAliveSignal chan int + ReadCounter chan int + WriteCounter chan int } // NewNode make node ptr -func newNode(addr string, h IHandler, keepAliveInterval int) Node { +func newNode(addr string, h iinternal) Node { return Node{ - addr: addr, - handler: h, - ReadCounter: make(chan int), - WriteCounter: make(chan int), - signal: make(chan int), - keepAliveInterval: keepAliveInterval, + addr: addr, + internalHandler: h, + ReadCounter: make(chan int), + WriteCounter: make(chan int), + signal: make(chan int), + keepAliveSignal: make(chan int, 1), } } -func (n *Node) keepAlive() {} - -func (n *Node) OnOpen(*Session) {} - -func (n *Node) OnClose(*Session, bool) {} - -func (n *Node) OnReq(*Session, []byte, Callback) {} - -func (n *Node) OnPush(*Session, []byte) int16 { - return 0 -} - // Stop stop the IOCounter service func (n *Node) Stop() { n.signal <- 1 + n.keepAliveSignal <- 1 +} + +// Start +func (n *Node) Start() { + go n.ioCounter() + go n.internalHandler.keepAlive() } // IOCounter io couter func (n *Node) ioCounter() { defer Recover() + log.Println("IOCounter running.") reads := 0 writes := 0 - d1 := time.Second * 5 - t1 := time.NewTimer(d1) - - d2 := time.Duration(n.keepAliveInterval) * time.Second - t2 := time.NewTimer(d2) + d := time.Second * 5 + t := time.NewTimer(d) stop := false for !stop { @@ -82,15 +75,11 @@ func (n *Node) ioCounter() { case m := <-n.WriteCounter: writes += m - case <-t1.C: - t1.Reset(d1) + case <-t.C: + t.Reset(d) log.Printf("Read : %d/s\tWrite : %d/s\n", reads/5, writes/5) reads, writes = 0, 0 - case <-t2.C: - t2.Reset(d2) - n.keepAlive() - case <-n.signal: stop = true } diff --git a/server.go b/server.go index efaaaf4..d8521e6 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package gomsg import ( "log" "net" + "time" ) // Server struct @@ -10,6 +11,7 @@ type Server struct { Node sessions map[int32]*Session seed int32 + handler IHandler } // OnOpen ... @@ -38,22 +40,41 @@ func (s *Server) OnPush(session *Session, data []byte) int16 { // NewServer new tcp server func NewServer(host string, h IHandler) *Server { - return &Server{ + ret := &Server{ sessions: make(map[int32]*Session), seed: 0, - Node: newNode(host, h, 20), + handler: h, } + + ret.Node = newNode(host, ret) + return ret } // keep alive func (s *Server) keepAlive() { - for _, session := range s.sessions { - if session.elapsedSinceLastResponse() > 40 { - session.Ping() - } else if session.elapsedSinceLastResponse() > 60 { - session.Close(true) + defer Recover() + d := time.Second * 5 + t := time.NewTimer(d) + + stop := false + for !stop { + select { + case <-t.C: + t.Reset(d) + for _, session := range s.sessions { + if session.elapsedSinceLastResponse() > 40 { + session.Ping() + } else if session.elapsedSinceLastResponse() > 60 { + session.Close(true) + } + } + + case <-s.keepAliveSignal: + stop = true } } + + log.Println("Keep alive stopped.") } // Start server startup @@ -89,6 +110,7 @@ func (s *Server) Stop() { } s.sessions = make(map[int32]*Session) + s.keepAliveSignal <- 1 s.Node.Stop() log.Println("server stopped.") } diff --git a/session.go b/session.go index a58263a..26acf6b 100644 --- a/session.go +++ b/session.go @@ -90,6 +90,7 @@ func (s *Session) scan() { for input.Scan() { // dispatch s.dispatch(input.Bytes()) + s.node.ReadCounter <- 1 } @@ -105,7 +106,7 @@ func (s *Session) Close(force bool) { s.conn.Close() s.closed = true - s.node.OnClose(s, force) + s.node.internalHandler.OnClose(s, force) log.Printf("conn [%d] closed.\n", s.ID) } @@ -278,12 +279,12 @@ func (s *Session) Write(data []byte) error { // Request request remote to response func (s *Session) Request(data []byte) *Result { if len(data) == 0 { - return &Result{En: RequestDataIsEmpty} + return &Result{En: int16(RequestDataIsEmpty)} } s.reqSeed++ if _, exists := s.reqPool[s.reqSeed]; exists { - return &Result{En: SerialConflict} + return &Result{En: int16(SerialConflict)} } req := make(chan *Result) @@ -301,7 +302,7 @@ func (s *Session) Request(data []byte) *Result { err := s.Write(buf.Bytes()) if err != nil { - return &Result{En: Write} + return &Result{En: int16(Write)} } return <-req diff --git a/sta.go b/sta.go index 7d56890..ba30c10 100644 --- a/sta.go +++ b/sta.go @@ -67,7 +67,7 @@ func (s *STAService) startImp() { ret.session.response(ret.serial, ret.ret) case push := <-s.push: - ret := push.session.node.OnPush(push.session, push.body) + ret := push.session.node.internalHandler.OnPush(push.session, push.body) if ret != 0 { log.Printf("onPush : %d\n", ret) } @@ -83,7 +83,7 @@ func (s *STAService) startImp() { req <- &Result{En: rsp.en, Data: rsp.body} case req := <-s.req: - req.session.node.OnReq(req.session, req.body, func(r *Result) { + req.session.node.internalHandler.OnReq(req.session, req.body, func(r *Result) { STA().Ret <- NewRet(req.session, req.serial, r) }) }