Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: initialize mgw-wanfed to use local gateways more on startup #9528

Merged
merged 4 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/9528.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: When wan federating via mesh gateways after initial federation default to using the local mesh gateways unless the heuristic indicates a bypass is required.
```
2 changes: 2 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4682,6 +4682,7 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) {
}
# wanfed
primary_gateways = ["` + gwAddr + `"]
retry_interval_wan = "250ms"
connect {
enabled = true
enable_mesh_gateway_wan_federation = true
Expand All @@ -4707,6 +4708,7 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) {
}
# wanfed
primary_gateways = ["` + gwAddr + `"]
retry_interval_wan = "250ms"
connect {
enabled = true
enable_mesh_gateway_wan_federation = true
Expand Down
29 changes: 22 additions & 7 deletions agent/consul/gateway_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ type GatewayLocator struct {
primaryDatacenter string

// these ONLY contain ones that have the wanfed:1 meta
gatewaysLock sync.Mutex
primaryGateways []string // WAN addrs
localGateways []string // LAN addrs
gatewaysLock sync.Mutex
primaryGateways []string // WAN addrs
localGateways []string // LAN addrs
populatedGateways bool

// primaryMeshGatewayDiscoveredAddresses is the current fallback addresses
// for the mesh gateways in the primary datacenter.
Expand Down Expand Up @@ -205,6 +206,10 @@ func (g *GatewayLocator) listGateways(primary bool) []string {
g.gatewaysLock.Lock()
defer g.gatewaysLock.Unlock()

if !g.populatedGateways {
return nil // don't even do anything yet
}

var addrs []string
if primary {
if g.datacenter == g.primaryDatacenter {
Expand Down Expand Up @@ -267,6 +272,7 @@ type serverDelegate interface {
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
IsLeader() bool
LeaderLastContact() time.Time
setDatacenterSupportsFederationStates()
}

func NewGatewayLocator(
Expand All @@ -283,6 +289,8 @@ func NewGatewayLocator(
primaryGatewaysReadyCh: make(chan struct{}),
}
g.logPrimaryDialingMessage(g.DialPrimaryThroughLocalGateway())
// initialize
g.SetLastFederationStateReplicationError(nil, false)
return g
}

Expand All @@ -292,17 +300,18 @@ func (g *GatewayLocator) Run(ctx context.Context) {
var lastFetchIndex uint64
retryLoopBackoff(ctx, func() error {
idx, err := g.runOnce(lastFetchIndex)
if err != nil {
if errors.Is(err, errGatewayLocalStateNotInitialized) {
// don't do exponential backoff for something that's not broken
return nil
} else if err != nil {
return err
}

lastFetchIndex = idx

return nil
}, func(err error) {
if !errors.Is(err, errGatewayLocalStateNotInitialized) {
g.logger.Error("error tracking primary and local mesh gateways", "error", err)
}
g.logger.Error("error tracking primary and local mesh gateways", "error", err)
})
}

Expand Down Expand Up @@ -367,6 +376,10 @@ func (g *GatewayLocator) checkLocalStateIsReady() error {
}

func (g *GatewayLocator) updateFromState(results []*structs.FederationState) {
if len(results) > 0 {
g.srv.setDatacenterSupportsFederationStates()
}

var (
local structs.CheckServiceNodes
primary structs.CheckServiceNodes
Expand All @@ -388,6 +401,8 @@ func (g *GatewayLocator) updateFromState(results []*structs.FederationState) {
g.gatewaysLock.Lock()
defer g.gatewaysLock.Unlock()

g.populatedGateways = true

changed := false
primaryReady := false
if !stringslice.Equal(g.primaryGateways, primaryAddrs) {
Expand Down
Loading