Skip to content

Commit

Permalink
agent: support go-discover retry-join for wan
Browse files Browse the repository at this point in the history
  • Loading branch information
magiconair committed Aug 19, 2017
1 parent 642fcd4 commit 8f28f8a
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 51 deletions.
4 changes: 2 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ func (a *Agent) Start() error {
}

// start retry join
go a.retryJoin()
go a.retryJoinWan()
go a.retryJoinLAN()
go a.retryJoinWAN()

return nil
}
Expand Down
118 changes: 69 additions & 49 deletions agent/retry_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,88 @@ package agent

import (
"fmt"
"log"
"strings"
"time"

discover "github.com/hashicorp/go-discover"
)

// RetryJoin is used to handle retrying a join until it succeeds or all
func (a *Agent) retryJoinLAN() {
r := &retryJoiner{
cluster: "LAN",
addrs: a.config.RetryJoin,
maxAttempts: a.config.RetryMaxAttempts,
interval: a.config.RetryInterval,
join: a.JoinLAN,
logger: a.logger,
}
if err := r.retryJoin(); err != nil {
a.retryJoinCh <- err
}
}

func (a *Agent) retryJoinWAN() {
r := &retryJoiner{
cluster: "WAN",
addrs: a.config.RetryJoinWan,
maxAttempts: a.config.RetryMaxAttemptsWan,
interval: a.config.RetryIntervalWan,
join: a.JoinWAN,
logger: a.logger,
}
if err := r.retryJoin(); err != nil {
a.retryJoinCh <- err
}
}

// retryJoiner is used to handle retrying a join until it succeeds or all
// retries are exhausted.
func (a *Agent) retryJoin() {
cfg := a.config
if len(cfg.RetryJoin) == 0 {
return
type retryJoiner struct {
// cluster is the name of the serf cluster, e.g. "LAN" or "WAN".
cluster string

// addrs is the list of servers or go-discover configurations
// to join with.
addrs []string

// maxAttempts is the number of join attempts before giving up.
maxAttempts int

// interval is the time between two join attempts.
interval time.Duration

// join adds the discovered or configured servers to the given
// serf cluster.
join func([]string) (int, error)

// logger is the agent logger. Log messages should contain the
// "agent: " prefix.
logger *log.Logger
}

func (r *retryJoiner) retryJoin() error {
if len(r.addrs) == 0 {
return nil
}

disco := discover.Discover{}
a.logger.Printf("[INFO] agent: Retry join is supported for: %s", strings.Join(disco.Names(), " "))
a.logger.Printf("[INFO] agent: Joining cluster...")
r.logger.Printf("[INFO] agent: Retry join %s is supported for: %s", r.cluster, strings.Join(disco.Names(), " "))
r.logger.Printf("[INFO] agent: Joining %s cluster...", r.cluster)
attempt := 0
for {
var addrs []string
var err error

for _, addr := range cfg.RetryJoin {
for _, addr := range r.addrs {
switch {
case strings.Contains(addr, "provider="):
servers, err := disco.Addrs(addr, a.logger)
servers, err := disco.Addrs(addr, r.logger)
if err != nil {
a.logger.Printf("[ERR] agent: %s", err)
r.logger.Printf("[ERR] agent: Join %s: %s", r.cluster, err)
} else {
addrs = append(addrs, servers...)
a.logger.Printf("[INFO] agent: Discovered servers: %s", strings.Join(servers, " "))
r.logger.Printf("[INFO] agent: Discovered %s servers: %s", r.cluster, strings.Join(servers, " "))
}

default:
Expand All @@ -41,10 +92,10 @@ func (a *Agent) retryJoin() {
}

if len(addrs) > 0 {
n, err := a.JoinLAN(addrs)
n, err := r.join(addrs)
if err == nil {
a.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
r.logger.Printf("[INFO] agent: Join %s completed. Synced with %d initial agents", r.cluster, n)
return nil
}
}

Expand All @@ -53,42 +104,11 @@ func (a *Agent) retryJoin() {
}

attempt++
if cfg.RetryMaxAttempts > 0 && attempt > cfg.RetryMaxAttempts {
a.retryJoinCh <- fmt.Errorf("agent: max join retry exhausted, exiting")
return
}

a.logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err, cfg.RetryInterval)
time.Sleep(cfg.RetryInterval)
}
}

// RetryJoinWan is used to handle retrying a join -wan until it succeeds or all
// retries are exhausted.
func (a *Agent) retryJoinWan() {
cfg := a.config

if len(cfg.RetryJoinWan) == 0 {
return
}

a.logger.Printf("[INFO] agent: Joining WAN cluster...")

attempt := 0
for {
n, err := a.JoinWAN(cfg.RetryJoinWan)
if err == nil {
a.logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n)
return
}

attempt++
if cfg.RetryMaxAttemptsWan > 0 && attempt > cfg.RetryMaxAttemptsWan {
a.retryJoinCh <- fmt.Errorf("agent: max join -wan retry exhausted, exiting")
return
if r.maxAttempts > 0 && attempt > r.maxAttempts {
return fmt.Errorf("agent: max join %s retry exhausted, exiting", r.cluster)
}

a.logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err, cfg.RetryIntervalWan)
time.Sleep(cfg.RetryIntervalWan)
r.logger.Printf("[WARN] agent: Join %s failed: %v, retrying in %v", r.cluster, err, r.interval)
time.Sleep(r.interval)
}
}

0 comments on commit 8f28f8a

Please sign in to comment.