Skip to content

Commit

Permalink
server: use the presense of stored federation state data as a sign th…
Browse files Browse the repository at this point in the history
…at we already activated the federation state feature flag (#9519)

This way we only have to wait for the serf barrier to pass once before
we can make use of federation state APIs Without this patch every
restart needs to re-compute the change.
  • Loading branch information
rboyer authored and hashicorp-ci committed Jan 28, 2021
1 parent a89a803 commit fa9b61b
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/9519.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
server: use the presense of stored federation state data as a sign that we already activated the federation state feature flag
```
10 changes: 10 additions & 0 deletions agent/consul/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationT
})
}

func waitForFederationStateFeature(t *testing.T, server *Server) {
t.Helper()

retry.Run(t, func(r *retry.R) {
require.True(r, server.DatacenterSupportsFederationStates())
})

require.True(t, server.DatacenterSupportsFederationStates())
}

func seeEachOther(a, b []serf.Member, addra, addrb string) bool {
return serfMembersContains(a, addrb) && serfMembersContains(b, addra)
}
Expand Down
6 changes: 5 additions & 1 deletion agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,10 @@ func (s *Server) reapTombstones(index uint64) {
}
}

func (s *Server) setDatacenterSupportsFederationStates() {
atomic.StoreInt32(&s.dcSupportsFederationStates, 1)
}

func (s *Server) DatacenterSupportsFederationStates() bool {
if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 {
return true
Expand All @@ -1510,7 +1514,7 @@ func (s *Server) DatacenterSupportsFederationStates() bool {
s.router.CheckServers(s.config.Datacenter, state.update)

if state.supported && state.found {
atomic.StoreInt32(&s.dcSupportsFederationStates, 1)
s.setDatacenterSupportsFederationStates()
return true
}

Expand Down
10 changes: 10 additions & 0 deletions agent/consul/leader_federation_state_ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ const (
)

func (s *Server) startFederationStateAntiEntropy() {
// Check to see if we can skip waiting for serf feature detection below.
if !s.DatacenterSupportsFederationStates() {
_, fedStates, err := s.fsm.State().FederationStateList(nil)
if err != nil {
s.logger.Warn("Failed to check for existing federation states and activate the feature flag quicker; skipping this optimization", "error", err)
} else if len(fedStates) > 0 {
s.setDatacenterSupportsFederationStates()
}
}

if s.config.DisableFederationStateAntiEntropy {
return
}
Expand Down
77 changes: 77 additions & 0 deletions agent/consul/leader_federation_state_ae_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,83 @@ import (
"github.com/stretchr/testify/require"
)

func TestLeader_FederationStateAntiEntropy_FeatureIsStickyEvenIfSerfTagsRegress(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()

// We test this by having two datacenters with one server each. They
// initially come up and pass the serf barrier, then we power them both
// off. We leave the primary off permanently, and then we stand up the
// secondary. Hopefully it should transition to allow federation states.

dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

waitForLeaderEstablishment(t, s1)

dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()

waitForLeaderEstablishment(t, s2)

// Create the WAN link
joinWAN(t, s2, s1)
waitForLeaderEstablishment(t, s1)
waitForLeaderEstablishment(t, s2)

waitForFederationStateFeature(t, s1)
waitForFederationStateFeature(t, s2)

// Wait for everybody's AE to complete.
retry.Run(t, func(r *retry.R) {
_, states, err := s1.fsm.State().FederationStateList(nil)
require.NoError(r, err)
require.Len(r, states, 2)
})

// Shutdown s1 and s2.
s1.Shutdown()
s2.Shutdown()

// Restart just s2

dir2new, s2new := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000

c.DataDir = s2.config.DataDir
c.NodeName = s2.config.NodeName
c.NodeID = s2.config.NodeID
})
defer os.RemoveAll(dir2new)
defer s2new.Shutdown()

waitForLeaderEstablishment(t, s2new)

// It should be able to transition without connectivity to the primary.
waitForFederationStateFeature(t, s2new)
}

func TestLeader_FederationStateAntiEntropy_BlockingQuery(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit fa9b61b

Please sign in to comment.