-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathtclient_reconnect.go
156 lines (140 loc) · 4.25 KB
/
tclient_reconnect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package turbo
import (
"math"
"net"
"sync"
"time"
)
//-------------重连任务
type reconnectTask struct {
remoteClient *TClient
retryCount int
nextTimeout time.Duration
ga *GroupAuth
finishHook func(addr string)
}
func newReconnectTasK(remoteClient *TClient, ga *GroupAuth, finishHook func(addr string)) *reconnectTask {
return &reconnectTask{
remoteClient: remoteClient,
ga: ga,
retryCount: 0,
finishHook: finishHook}
}
//先进行初次握手上传连接元数据
func (self *reconnectTask) reconnect(handshake func(ga *GroupAuth, remoteClient *TClient) (bool, error)) (bool, error) {
self.retryCount++
//开启remoteClient的重连任务
tcpAddr, _ := net.ResolveTCPAddr("tcp4", self.remoteClient.RemoteAddr())
conn, err := net.DialTCP("tcp4", nil, tcpAddr)
if nil != err {
log.Errorf("TClient|RECONNECT|%s|FAIL|%s", self.remoteClient.RemoteAddr(), err)
return false, err
}
//重新设置conn
self.remoteClient.conn = conn
self.remoteClient.Start()
return handshake(self.ga, self.remoteClient)
}
//重连管理器
type ReconnectManager struct {
timers map[string] /*hostport*/ uint32
allowReconnect bool //是否允许重连
reconnectTimeout time.Duration //重连超时
maxReconnectTimes int //最大重连次数
tw *TimerWheel
handshake func(ga *GroupAuth, remoteClient *TClient) (bool, error)
lock sync.Mutex
}
//重连管理器
func NewReconnectManager(allowReconnect bool,
reconnectTimeout time.Duration, maxReconnectTimes int,
handshake func(ga *GroupAuth, remoteClient *TClient) (bool, error)) *ReconnectManager {
manager := &ReconnectManager{
timers: make(map[string]uint32, 20),
tw: NewTimerWheel(1 * time.Second),
allowReconnect: allowReconnect,
reconnectTimeout: reconnectTimeout,
maxReconnectTimes: maxReconnectTimes, handshake: handshake}
log.Infof("ReconnectManager|Start...")
return manager
}
//提交重连任务
func (self *ReconnectManager) submit(c *TClient, ga *GroupAuth, finishHook func(addr string)) {
if !self.allowReconnect {
return
}
self.lock.Lock()
defer self.lock.Unlock()
//如果已经有该重连任务在执行则忽略
_, ok := self.timers[c.RemoteAddr()]
if ok {
return
}
self.submit0(newReconnectTasK(c, ga, finishHook))
}
//提交一个任务
//调用方外面需要加锁。否则还有并发安全问题。
func (self *ReconnectManager) submit0(task *reconnectTask) {
addr := task.remoteClient.RemoteAddr()
//创建定时的timer
timerid, _ := self.tw.AddTimer(
task.nextTimeout,
func(tid uint32, t time.Time) {
succ, err := task.reconnect(self.handshake)
if nil != err || !succ {
log.Infof("ReconnectManager|RECONNECT|FAIL|%v|%s|%d", err, addr, task.retryCount)
retry := func() bool {
self.lock.Lock()
defer self.lock.Unlock()
//如果当前重试次数大于最大重试次数则放弃
if task.retryCount > self.maxReconnectTimes {
log.Infof("ReconnectManager|OVREFLOW MAX TRYCOUNT|REMOVE|%s|%d", addr, task.retryCount)
_, ok := self.timers[addr]
if ok {
delete(self.timers, addr)
task.finishHook(addr)
}
return false
}
return true
}()
if retry {
//继续进行下次重连
task.nextTimeout = time.Duration(int64(math.Pow(2, float64(task.retryCount))) * int64(self.reconnectTimeout))
//记录timer
self.lock.Lock()
self.submit0(task)
self.lock.Unlock()
}
} else {
self.lock.Lock()
defer self.lock.Unlock()
_, ok := self.timers[addr]
if ok {
delete(self.timers, addr)
}
log.Infof("ReconnectManager|RECONNECT|SUCC|%s|addr:%s|retryCount:%d", task.remoteClient.RemoteAddr(), addr, task.retryCount)
}
}, nil)
//调用方保证这里线程安全
self.timers[addr] = timerid
}
//取消重连任务
func (self *ReconnectManager) cancel(hostport string) {
self.lock.Lock()
defer self.lock.Unlock()
tid, ok := self.timers[hostport]
if ok {
delete(self.timers, hostport)
self.tw.CancelTimer(tid)
}
}
func (self *ReconnectManager) stop() {
self.allowReconnect = false
self.lock.Lock()
defer self.lock.Unlock()
for _, tid := range self.timers {
self.tw.CancelTimer(tid)
}
log.Infof("ReconnectManager|stop...")
}