From c848be39abb1461991bb423be9c417e02cfb92e7 Mon Sep 17 00:00:00 2001 From: Yu Fang Date: Wed, 8 Jan 2025 23:37:53 +0800 Subject: [PATCH] update --- netservice/handle/handle.go | 2 +- services/gatewayManager.go | 2 ++ services/serverSession.go | 42 ++++++++++++++++++++----------------- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/netservice/handle/handle.go b/netservice/handle/handle.go index 1658976..8d1d0e3 100644 --- a/netservice/handle/handle.go +++ b/netservice/handle/handle.go @@ -246,7 +246,7 @@ func HandleSession(session *yamux.Session, tokenStr string) { // Accept a stream stream, err := session.AcceptStream() if err != nil { - log.Println("accpStreamErr:" + err.Error()) + log.Println("accept stream form session got err:" + err.Error()) if stream != nil { stream.Close() } diff --git a/services/gatewayManager.go b/services/gatewayManager.go index 83c702e..c39d2c1 100644 --- a/services/gatewayManager.go +++ b/services/gatewayManager.go @@ -17,6 +17,7 @@ func (gm *GatewayCtl) Loged() bool { return len(gm.serverSession) > 0 } +// AddServer 添加网关实例,登录一个id func (gm *GatewayCtl) AddServer(token string) (err error) { tokenModel, err := models.DecodeUnverifiedToken(token) if err != nil { @@ -35,6 +36,7 @@ func (gm *GatewayCtl) AddServer(token string) (err error) { return serverSession.start() } +// DelServer 删除网关实例,删除一个id func (gm *GatewayCtl) DelServer(runid string) (err error) { if _, ok := gm.serverSession[runid]; ok { log.Println("找到了runid的serverSession") diff --git a/services/serverSession.go b/services/serverSession.go index e77af93..be05bd6 100644 --- a/services/serverSession.go +++ b/services/serverSession.go @@ -11,8 +11,10 @@ import ( ) type ServerSession struct { - token string - tokenModel *models.TokenClaims + //基础信息 + token string + tokenModel *models.TokenClaims + //内部存储 session *yamux.Session heartbeat *time.Ticker quit chan struct{} @@ -25,14 +27,15 @@ func (ss *ServerSession) stop() { } func (ss *ServerSession) start() (err error) { - ss.CheckSessionStatus() + //防止多次调用 + ss.checkSessionStatus() ss.heartbeat = time.NewTicker(time.Second * 20) ss.quit = make(chan struct{}) - go ss.Task() + go ss.task() return } -func (ss *ServerSession) LoginServer() (err error) { +func (ss *ServerSession) loginServer() (err error) { ss.loginLock.Lock() defer ss.loginLock.Unlock() if ss.session != nil && !ss.session.IsClosed() { @@ -46,17 +49,18 @@ func (ss *ServerSession) LoginServer() (err error) { return } -func (ss *ServerSession) LoopStream() { +func (ss *ServerSession) loopStream() { ss.loopStreamLock.Lock() defer ss.loopStreamLock.Unlock() - defer func() { - if ss.session != nil { - err := ss.session.Close() - if err != nil { - log.Println(err.Error()) - } - } - }() + //防止影响新创建的会话,不关闭会话 + //defer func() { + // if ss.session != nil { + // err := ss.session.Close() + // if err != nil { + // log.Println(err.Error()) + // } + // } + //}() for { if ss.session == nil || (ss.session != nil && ss.session.IsClosed()) { log.Println("ss.session is nil") @@ -73,24 +77,24 @@ func (ss *ServerSession) LoopStream() { } } -func (ss *ServerSession) CheckSessionStatus() { +func (ss *ServerSession) checkSessionStatus() { if ss.session == nil || (ss.session != nil && ss.session.IsClosed()) { log.Println("开始(重新)连接:", ss.tokenModel.RunId, "@", ss.tokenModel.Host) - err := ss.LoginServer() + err := ss.loginServer() if err != nil { log.Println(err) return } - go ss.LoopStream() + go ss.loopStream() } } -func (ss *ServerSession) Task() { +func (ss *ServerSession) task() { for { select { //心跳来了,检测连接的存活状态 case <-ss.heartbeat.C: - ss.CheckSessionStatus() + ss.checkSessionStatus() case <-ss.quit: ss.heartbeat.Stop() close(ss.quit)