Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
IoTServ committed Jan 8, 2025
1 parent c883b39 commit c848be3
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
2 changes: 1 addition & 1 deletion netservice/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func HandleSession(session *yamux.Session, tokenStr string) {
// Accept a stream
stream, err := session.AcceptStream()
if err != nil {
log.Println("accpStreamErr:" + err.Error())
log.Println("accept stream form session got err:" + err.Error())
if stream != nil {
stream.Close()
}
Expand Down
2 changes: 2 additions & 0 deletions services/gatewayManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (gm *GatewayCtl) Loged() bool {
return len(gm.serverSession) > 0
}

// AddServer 添加网关实例,登录一个id
func (gm *GatewayCtl) AddServer(token string) (err error) {
tokenModel, err := models.DecodeUnverifiedToken(token)
if err != nil {
Expand All @@ -35,6 +36,7 @@ func (gm *GatewayCtl) AddServer(token string) (err error) {
return serverSession.start()
}

// DelServer 删除网关实例,删除一个id
func (gm *GatewayCtl) DelServer(runid string) (err error) {
if _, ok := gm.serverSession[runid]; ok {
log.Println("找到了runid的serverSession")
Expand Down
42 changes: 23 additions & 19 deletions services/serverSession.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
)

type ServerSession struct {
token string
tokenModel *models.TokenClaims
//基础信息
token string
tokenModel *models.TokenClaims
//内部存储
session *yamux.Session
heartbeat *time.Ticker
quit chan struct{}
Expand All @@ -25,14 +27,15 @@ func (ss *ServerSession) stop() {
}

func (ss *ServerSession) start() (err error) {
ss.CheckSessionStatus()
//防止多次调用
ss.checkSessionStatus()
ss.heartbeat = time.NewTicker(time.Second * 20)
ss.quit = make(chan struct{})
go ss.Task()
go ss.task()
return
}

func (ss *ServerSession) LoginServer() (err error) {
func (ss *ServerSession) loginServer() (err error) {
ss.loginLock.Lock()
defer ss.loginLock.Unlock()
if ss.session != nil && !ss.session.IsClosed() {
Expand All @@ -46,17 +49,18 @@ func (ss *ServerSession) LoginServer() (err error) {
return
}

func (ss *ServerSession) LoopStream() {
func (ss *ServerSession) loopStream() {
ss.loopStreamLock.Lock()
defer ss.loopStreamLock.Unlock()
defer func() {
if ss.session != nil {
err := ss.session.Close()
if err != nil {
log.Println(err.Error())
}
}
}()
//防止影响新创建的会话,不关闭会话
//defer func() {
// if ss.session != nil {
// err := ss.session.Close()
// if err != nil {
// log.Println(err.Error())
// }
// }
//}()
for {
if ss.session == nil || (ss.session != nil && ss.session.IsClosed()) {
log.Println("ss.session is nil")
Expand All @@ -73,24 +77,24 @@ func (ss *ServerSession) LoopStream() {
}
}

func (ss *ServerSession) CheckSessionStatus() {
func (ss *ServerSession) checkSessionStatus() {
if ss.session == nil || (ss.session != nil && ss.session.IsClosed()) {
log.Println("开始(重新)连接:", ss.tokenModel.RunId, "@", ss.tokenModel.Host)
err := ss.LoginServer()
err := ss.loginServer()
if err != nil {
log.Println(err)
return
}
go ss.LoopStream()
go ss.loopStream()
}
}

func (ss *ServerSession) Task() {
func (ss *ServerSession) task() {
for {
select {
//心跳来了,检测连接的存活状态
case <-ss.heartbeat.C:
ss.CheckSessionStatus()
ss.checkSessionStatus()
case <-ss.quit:
ss.heartbeat.Stop()
close(ss.quit)
Expand Down

0 comments on commit c848be3

Please sign in to comment.