Skip to content

Commit

Permalink
keep alive
Browse files Browse the repository at this point in the history
  • Loading branch information
lizs committed Apr 28, 2018
1 parent 97af17c commit 81cd76b
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 62 deletions.
40 changes: 32 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ type Client struct {
session *Session
autoRetryEnabled bool
sta *STAService
handler IHandler
}

func (c *Client) OnOpen(s *Session) {
c.session.responseTime = time.Now().Unix()
c.handler.OnOpen(s)
}

Expand All @@ -23,6 +25,7 @@ func (c *Client) OnClose(s *Session, force bool) {

// reconnect
if !force && c.autoRetryEnabled {
log.Println("reconnecting ...")
c.Stop()
c.Start()
}
Expand All @@ -38,20 +41,40 @@ func (c *Client) OnPush(s *Session, data []byte) int16 {

// keep alive
func (c *Client) keepAlive() {
if c.session.elapsedSinceLastResponse() > 60 {
c.session.Close(false)
} else if c.session.elapsedSinceLastResponse() > 20 {
c.session.Ping()
defer Recover()

log.Println("Keep alive running.")

d := time.Second * 5
t := time.NewTimer(d)

stop := false
for !stop {
select {
case <-t.C:
t.Reset(d)
if c.session.elapsedSinceLastResponse() > 30 {
c.session.Close(false)
} else if c.session.elapsedSinceLastResponse() > 10 {
c.session.Ping()
}

case <-c.keepAliveSignal:
stop = true
}
}

log.Println("Keep alive stopped.")
}

// NewClient new tcp client
func NewClient(host string, h IHandler, autoRetry bool) *Client {
ret := &Client{
autoRetryEnabled: autoRetry,
session: nil,
handler: h,
}
ret.Node = newNode(host, h, 10)
ret.Node = newNode(host, ret)

return ret
}
Expand Down Expand Up @@ -81,8 +104,8 @@ func (c *Client) Start() {
break
}

// io counter
go c.ioCounter()
// base start
c.Node.Start()

// make session
c.session = newSession(0, conn, &c.Node)
Expand All @@ -98,11 +121,12 @@ func (c *Client) Start() {

// Stop client shutdown
func (c *Client) Stop() {
c.Node.Stop()

if c.session != nil {
c.session.Close(true)
c.session = nil
}

c.Node.Stop()
log.Println("client stopped.")
}
4 changes: 2 additions & 2 deletions enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ package gomsg
type NetError int16

const (
Success NetError = 0
ExceptionCatched = iota + -100
Success NetError = iota
ExceptionCatched
Write
Read
RequestDataIsEmpty
Expand Down
4 changes: 2 additions & 2 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func CloseLog() {
// Recover recover tool function
func Recover() {
if e := recover(); e != nil {
log.Println(debug.Stack())
log.Printf("%s=>%s\n", e, debug.Stack())
if Logger != nil {
Logger.Println(debug.Stack())
Logger.Printf("%s=>%s\n", e, debug.Stack())
}
}
}
63 changes: 26 additions & 37 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type Callback func(*Result)

// internal with keep alive handling
type iInternalHandler interface {
type iinternal interface {
IHandler
keepAlive()
}
Expand All @@ -24,54 +24,47 @@ type IHandler interface {

// Node struct
type Node struct {
addr string
handler IHandler
signal chan int
ReadCounter chan int
WriteCounter chan int
keepAliveInterval int
addr string
internalHandler iinternal
signal chan int
keepAliveSignal chan int
ReadCounter chan int
WriteCounter chan int
}

// NewNode make node ptr
func newNode(addr string, h IHandler, keepAliveInterval int) Node {
func newNode(addr string, h iinternal) Node {
return Node{
addr: addr,
handler: h,
ReadCounter: make(chan int),
WriteCounter: make(chan int),
signal: make(chan int),
keepAliveInterval: keepAliveInterval,
addr: addr,
internalHandler: h,
ReadCounter: make(chan int),
WriteCounter: make(chan int),
signal: make(chan int),
keepAliveSignal: make(chan int, 1),
}
}

func (n *Node) keepAlive() {}

func (n *Node) OnOpen(*Session) {}

func (n *Node) OnClose(*Session, bool) {}

func (n *Node) OnReq(*Session, []byte, Callback) {}

func (n *Node) OnPush(*Session, []byte) int16 {
return 0
}

// Stop stop the IOCounter service
func (n *Node) Stop() {
n.signal <- 1
n.keepAliveSignal <- 1
}

// Start
func (n *Node) Start() {
go n.ioCounter()
go n.internalHandler.keepAlive()
}

// IOCounter io couter
func (n *Node) ioCounter() {
defer Recover()

log.Println("IOCounter running.")
reads := 0
writes := 0
d1 := time.Second * 5
t1 := time.NewTimer(d1)

d2 := time.Duration(n.keepAliveInterval) * time.Second
t2 := time.NewTimer(d2)
d := time.Second * 5
t := time.NewTimer(d)

stop := false
for !stop {
Expand All @@ -82,15 +75,11 @@ func (n *Node) ioCounter() {
case m := <-n.WriteCounter:
writes += m

case <-t1.C:
t1.Reset(d1)
case <-t.C:
t.Reset(d)
log.Printf("Read : %d/s\tWrite : %d/s\n", reads/5, writes/5)
reads, writes = 0, 0

case <-t2.C:
t2.Reset(d2)
n.keepAlive()

case <-n.signal:
stop = true
}
Expand Down
36 changes: 29 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package gomsg
import (
"log"
"net"
"time"
)

// Server struct
type Server struct {
Node
sessions map[int32]*Session
seed int32
handler IHandler
}

// OnOpen ...
Expand Down Expand Up @@ -38,22 +40,41 @@ func (s *Server) OnPush(session *Session, data []byte) int16 {

// NewServer new tcp server
func NewServer(host string, h IHandler) *Server {
return &Server{
ret := &Server{
sessions: make(map[int32]*Session),
seed: 0,
Node: newNode(host, h, 20),
handler: h,
}

ret.Node = newNode(host, ret)
return ret
}

// keep alive
func (s *Server) keepAlive() {
for _, session := range s.sessions {
if session.elapsedSinceLastResponse() > 40 {
session.Ping()
} else if session.elapsedSinceLastResponse() > 60 {
session.Close(true)
defer Recover()
d := time.Second * 5
t := time.NewTimer(d)

stop := false
for !stop {
select {
case <-t.C:
t.Reset(d)
for _, session := range s.sessions {
if session.elapsedSinceLastResponse() > 40 {
session.Ping()
} else if session.elapsedSinceLastResponse() > 60 {
session.Close(true)
}
}

case <-s.keepAliveSignal:
stop = true
}
}

log.Println("Keep alive stopped.")
}

// Start server startup
Expand Down Expand Up @@ -89,6 +110,7 @@ func (s *Server) Stop() {
}

s.sessions = make(map[int32]*Session)
s.keepAliveSignal <- 1
s.Node.Stop()
log.Println("server stopped.")
}
Expand Down
9 changes: 5 additions & 4 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (s *Session) scan() {
for input.Scan() {
// dispatch
s.dispatch(input.Bytes())

s.node.ReadCounter <- 1
}

Expand All @@ -105,7 +106,7 @@ func (s *Session) Close(force bool) {
s.conn.Close()
s.closed = true

s.node.OnClose(s, force)
s.node.internalHandler.OnClose(s, force)

log.Printf("conn [%d] closed.\n", s.ID)
}
Expand Down Expand Up @@ -278,12 +279,12 @@ func (s *Session) Write(data []byte) error {
// Request request remote to response
func (s *Session) Request(data []byte) *Result {
if len(data) == 0 {
return &Result{En: RequestDataIsEmpty}
return &Result{En: int16(RequestDataIsEmpty)}
}

s.reqSeed++
if _, exists := s.reqPool[s.reqSeed]; exists {
return &Result{En: SerialConflict}
return &Result{En: int16(SerialConflict)}
}

req := make(chan *Result)
Expand All @@ -301,7 +302,7 @@ func (s *Session) Request(data []byte) *Result {

err := s.Write(buf.Bytes())
if err != nil {
return &Result{En: Write}
return &Result{En: int16(Write)}
}

return <-req
Expand Down
4 changes: 2 additions & 2 deletions sta.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *STAService) startImp() {
ret.session.response(ret.serial, ret.ret)

case push := <-s.push:
ret := push.session.node.OnPush(push.session, push.body)
ret := push.session.node.internalHandler.OnPush(push.session, push.body)
if ret != 0 {
log.Printf("onPush : %d\n", ret)
}
Expand All @@ -83,7 +83,7 @@ func (s *STAService) startImp() {
req <- &Result{En: rsp.en, Data: rsp.body}

case req := <-s.req:
req.session.node.OnReq(req.session, req.body, func(r *Result) {
req.session.node.internalHandler.OnReq(req.session, req.body, func(r *Result) {
STA().Ret <- NewRet(req.session, req.serial, r)
})
}
Expand Down

0 comments on commit 81cd76b

Please sign in to comment.