Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

backoff late connection failures #1496

Merged
merged 5 commits into from
Oct 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 37 additions & 22 deletions router/connection_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
const (
InitialInterval = 2 * time.Second
MaxInterval = 6 * time.Minute
ResetAfter = 1 * time.Minute
)

type peerAddrs map[string]*net.TCPAddr
Expand All @@ -25,9 +26,17 @@ type ConnectionMaker struct {
actionChan chan<- ConnectionMakerAction
}

type TargetState int

const (
TargetWaiting TargetState = iota // we are waiting to connect there
TargetAttempting // we are attempting to connect there
TargetConnected // we are connected to there
)

// Information about an address where we may find a peer
type Target struct {
attempting bool // are we currently attempting to connect there?
state TargetState
lastError error // reason for disconnection last time
tryAfter time.Time // next time to try this address
tryInterval time.Duration // retry delay on next failure
Expand Down Expand Up @@ -73,7 +82,7 @@ func (cm *ConnectionMaker) InitiateConnections(peers []string, replace bool) []e
cm.directPeers[peer] = addr
// curtail any existing reconnect interval
if target, found := cm.targets[addr.String()]; found {
target.tryNow()
target.nextTryNow()
}
}
return true
Expand All @@ -92,7 +101,10 @@ func (cm *ConnectionMaker) ForgetConnections(peers []string) {

func (cm *ConnectionMaker) ConnectionAborted(address string, err error) {
cm.actionChan <- func() bool {
cm.retry(address, err)
target := cm.targets[address]
target.state = TargetWaiting
target.lastError = err
target.nextTryLater()
return true
}
}
Expand All @@ -101,7 +113,8 @@ func (cm *ConnectionMaker) ConnectionCreated(conn Connection) {
cm.actionChan <- func() bool {
cm.connections[conn] = void
if conn.Outbound() {
delete(cm.targets, conn.RemoteTCPAddr())
target := cm.targets[conn.RemoteTCPAddr()]
target.state = TargetConnected
}
return false
}
Expand All @@ -111,7 +124,17 @@ func (cm *ConnectionMaker) ConnectionTerminated(conn Connection, err error) {
cm.actionChan <- func() bool {
delete(cm.connections, conn)
if conn.Outbound() {
cm.retry(conn.RemoteTCPAddr(), err)
target := cm.targets[conn.RemoteTCPAddr()]
target.state = TargetWaiting
target.lastError = err
switch {
case err == ErrConnectToSelf:
target.nextTryNever()
case time.Now().After(target.tryAfter.Add(ResetAfter)):
target.nextTryNow()
default:
target.nextTryLater()
}
}
return true
}
Expand Down Expand Up @@ -151,8 +174,8 @@ func (cm *ConnectionMaker) checkStateAndAttemptConnections() time.Duration {
if _, found := cm.targets[address]; found {
return
}
target := &Target{}
target.tryNow()
target := &Target{state: TargetWaiting}
target.nextTryNow()
cm.targets[address] = target
}

Expand Down Expand Up @@ -238,7 +261,7 @@ func (cm *ConnectionMaker) connectToTargets(validTarget map[string]struct{}, dir
now := time.Now() // make sure we catch items just added
after := MaxDuration
for address, target := range cm.targets {
if target.attempting {
if target.state != TargetWaiting {
continue
}
if _, valid := validTarget[address]; !valid {
Expand All @@ -250,7 +273,7 @@ func (cm *ConnectionMaker) connectToTargets(validTarget map[string]struct{}, dir
}
switch duration := target.tryAfter.Sub(now); {
case duration <= 0:
target.attempting = true
target.state = TargetAttempting
_, isCmdLineTarget := directTarget[address]
go cm.attemptConnection(address, isCmdLineTarget)
case duration < after:
Expand All @@ -268,27 +291,19 @@ func (cm *ConnectionMaker) attemptConnection(address string, acceptNewPeer bool)
}
}

func (cm *ConnectionMaker) retry(address string, err error) {
if target, found := cm.targets[address]; found {
target.attempting = false
target.lastError = err
target.retry()
}
func (t *Target) nextTryNever() {
t.tryAfter = time.Time{}
t.tryInterval = MaxInterval
}

func (t *Target) tryNow() {
func (t *Target) nextTryNow() {
t.tryAfter = time.Now()
t.tryInterval = InitialInterval
}

// The delay at the nth retry is a random value in the range
// [i-i/2,i+i/2], where i = InitialInterval * 1.5^(n-1).
func (t *Target) retry() {
if t.lastError == ErrConnectToSelf {
t.tryAfter = time.Time{}
t.tryInterval = MaxInterval
return
}
func (t *Target) nextTryLater() {
t.tryAfter = time.Now().Add(t.tryInterval/2 + time.Duration(rand.Int63n(int64(t.tryInterval))))
t.tryInterval = t.tryInterval * 3 / 2
if t.tryInterval > MaxInterval {
Expand Down
33 changes: 19 additions & 14 deletions router/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,28 @@ func NewLocalConnectionStatusSlice(cm *ConnectionMaker) []LocalConnectionStatus
slice = append(slice, LocalConnectionStatus{conn.RemoteTCPAddr(), conn.Outbound(), state, conn.Remote().String()})
}
for address, target := range cm.targets {
var state, info string
switch {
case target.lastError == nil:
state = "connecting"
info = ""
case target.attempting:
state = "retrying"
info = target.lastError.Error()
default:
state = "failed"
retry := "never"
add := func(state, info string) {
slice = append(slice, LocalConnectionStatus{address, true, state, info})
}
switch target.state {
case TargetWaiting:
until := "never"
if !target.tryAfter.IsZero() {
retry = target.tryAfter.String()
until = target.tryAfter.String()
}
if target.lastError == nil { // shouldn't happen
add("waiting", "until: "+until)
} else {
add("failed", target.lastError.Error()+", retry: "+until)
}
case TargetAttempting:
if target.lastError == nil {
add("connecting", "")
} else {
add("retrying", target.lastError.Error())
}
info = target.lastError.Error() + ", retry: " + retry
case TargetConnected:
}
slice = append(slice, LocalConnectionStatus{address, true, state, info})
}
resultChan <- slice
return false
Expand Down