Skip to content

Commit

Permalink
Call RemoveServer for reap events (#5317)
Browse files Browse the repository at this point in the history
This ensures that servers are removed from RPC routing when they are reaped.
  • Loading branch information
mkeeler authored Mar 4, 2019
1 parent 409c901 commit 200c0fb
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 7 deletions.
3 changes: 1 addition & 2 deletions agent/consul/client_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,11 @@ func (c *Client) lanEventHandler() {
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
Expand Down
40 changes: 40 additions & 0 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,46 @@ func TestClient_JoinLAN(t *testing.T) {
})
}

func TestClient_LANReap(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)

dir2, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer c1.Shutdown()

// Try to join
joinLAN(t, c1, s1)
testrpc.WaitForLeader(t, c1.RPC, "dc1")

retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 2)
require.Len(r, c1.LANMembers(), 2)
})

// Check the router has both
retry.Run(t, func(r *retry.R) {
server := c1.routers.FindServer()
require.NotNil(t, server)
require.Equal(t, s1.config.NodeName, server.Name)
})

// shutdown the second dc
s1.Shutdown()

retry.Run(t, func(r *retry.R) {
require.Len(r, c1.LANMembers(), 1)
server := c1.routers.FindServer()
require.Nil(t, server)
})
}

func TestClient_JoinLAN_Invalid(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
Expand Down
4 changes: 1 addition & 3 deletions agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ func (s *Server) lanEventHandler() {
s.lanNodeJoin(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))

case serf.EventMemberLeave, serf.EventMemberFailed:
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
s.lanNodeFailed(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))

case serf.EventMemberReap:
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventUser:
s.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate:
Expand Down
103 changes: 103 additions & 0 deletions agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"

"github.com/stretchr/testify/require"
)

func configureTLS(config *Config) {
Expand Down Expand Up @@ -236,6 +238,67 @@ func TestServer_JoinLAN(t *testing.T) {
})
}

func TestServer_LANReap(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()

dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir2)

dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfLANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfLANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()

// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)

testrpc.WaitForLeader(t, s3.RPC, "dc1")

retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 3)
require.Len(r, s2.LANMembers(), 3)
require.Len(r, s3.LANMembers(), 3)
})

// Check the router has both
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.serverLookup.Servers(), 3)
require.Len(r, s2.serverLookup.Servers(), 3)
require.Len(r, s3.serverLookup.Servers(), 3)
})

// shutdown the second dc
s2.Shutdown()

retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 2)
servers := s1.serverLookup.Servers()
require.Len(r, servers, 2)
// require.Equal(r, s1.config.NodeName, servers[0].Name)
})
}

func TestServer_JoinWAN(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
Expand Down Expand Up @@ -268,6 +331,46 @@ func TestServer_JoinWAN(t *testing.T) {
})
}

func TestServer_WANReap(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.SerfFloodInterval = 100 * time.Millisecond
c.SerfWANConfig.ReconnectTimeout = 250 * time.Millisecond
c.SerfWANConfig.ReapInterval = 500 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()

dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)

// Try to join
joinWAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.WANMembers(), 2)
require.Len(r, s2.WANMembers(), 2)
})

// Check the router has both
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.router.GetDatacenters(), 2)
require.Len(r, s2.router.GetDatacenters(), 2)
})

// shutdown the second dc
s2.Shutdown()

retry.Run(t, func(r *retry.R) {
require.Len(r, s1.WANMembers(), 1)
datacenters := s1.router.GetDatacenters()
require.Len(r, datacenters, 1)
require.Equal(r, "dc1", datacenters[0])
})

}

func TestServer_JoinWAN_Flood(t *testing.T) {
t.Parallel()
// Set up two servers in a WAN.
Expand Down
3 changes: 1 addition & 2 deletions agent/router/serf_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,14 @@ func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, s
case serf.EventMemberJoin:
handleMemberEvent(logger, router.AddServer, areaID, e)

case serf.EventMemberLeave:
case serf.EventMemberLeave, serf.EventMemberReap:
handleMemberEvent(logger, router.RemoveServer, areaID, e)

case serf.EventMemberFailed:
handleMemberEvent(logger, router.FailServer, areaID, e)

// All of these event types are ignored.
case serf.EventMemberUpdate:
case serf.EventMemberReap:
case serf.EventUser:
case serf.EventQuery:

Expand Down

0 comments on commit 200c0fb

Please sign in to comment.