Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use native grpc round robin balancer #12940

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions agent/consul/subscribe_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
)

Expand Down Expand Up @@ -204,11 +205,10 @@ func TestSubscribeBackend_IntegrationWithServer_TLSReload(t *testing.T) {
server.tlsConfigurator.Update(newConf)

// Try the subscribe call again
retryFailedConn(t, conn)

streamClient = pbsubscribe.NewStateChangeSubscriptionClient(conn)
Comment on lines -207 to -209
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see a need for this code to hard-reset the connection; it did not impact the test (nor did it cause me to add the retry below).

_, err = streamClient.Subscribe(ctx, req)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
_, err = streamClient.Subscribe(ctx, req)
require.NoError(r, err)
})
Comment on lines +208 to +211
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a slight timing issue (<1s) that flakes the second try. I'm unsure why the previous code did not run into the same issue but I didn't consider this to be a significant regression.

}

func clientConfigVerifyOutgoing(config *Config) {
Expand Down
5 changes: 3 additions & 2 deletions agent/grpc/private/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/keepalive"

"github.com/hashicorp/consul/agent/metadata"
Expand Down Expand Up @@ -130,8 +131,8 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien
grpc.WithContextDialer(c.dialer),
grpc.WithDisableRetry(),
grpc.WithStatsHandler(newStatsHandler(defaultMetrics())),
// nolint:staticcheck // there is no other supported alternative to WithBalancerName
grpc.WithBalancerName("pick_first"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// nolint:staticcheck
grpc.WithBalancerName(roundrobin.Name),
// Keep alive parameters are based on the same default ones we used for
// Yamux. These are somewhat arbitrary but we did observe in scale testing
// that the gRPC defaults (servers send keepalives only every 2 hours,
Expand Down
10 changes: 0 additions & 10 deletions agent/grpc/private/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,21 +373,11 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
first, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)

t.Run("rebalance a different DC, does nothing", func(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer a relevant test since calls are round-robin

res.NewRebalancer("dc-other")()

resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
require.Equal(t, resp.ServerName, first.ServerName)
})

t.Run("rebalance the dc", func(t *testing.T) {
// Rebalance is random, but if we repeat it a few times it should give us a
// new server.
attempts := 100
for i := 0; i < attempts; i++ {
res.NewRebalancer("dc1")()

resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
if resp.ServerName != first.ServerName {
Expand Down
42 changes: 0 additions & 42 deletions agent/grpc/private/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package resolver

import (
"fmt"
"math/rand"
"strings"
"sync"
"time"

"google.golang.org/grpc/resolver"

Expand Down Expand Up @@ -43,31 +41,6 @@ func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder {
}
}

// NewRebalancer returns a function which shuffles the server list for resolvers
// in all datacenters.
func (s *ServerResolverBuilder) NewRebalancer(dc string) func() {
shuffler := rand.New(rand.NewSource(time.Now().UnixNano()))
return func() {
s.lock.RLock()
defer s.lock.RUnlock()

for _, resolver := range s.resolvers {
if resolver.datacenter != dc {
continue
}
// Shuffle the list of addresses using the last list given to the resolver.
resolver.addrLock.Lock()
addrs := resolver.addrs
shuffler.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
// Pass the shuffled list to the resolver.
resolver.updateAddrsLocked(addrs)
resolver.addrLock.Unlock()
}
}
}

// ServerForGlobalAddr returns server metadata for a server with the specified globally unique address.
func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadata.Server, error) {
s.lock.RLock()
Expand Down Expand Up @@ -270,22 +243,7 @@ func (r *serverResolver) updateAddrs(addrs []resolver.Address) {
// updateAddrsLocked updates this serverResolver's ClientConn to use the given
// set of addrs. addrLock must be held by caller.
func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) {
// Only pass the first address initially, which will cause the
// balancer to spin down the connection for its previous first address
// if it is different. If we don't do this, it will keep using the old
// first address as long as it is still in the list, making it impossible to
// rebalance until that address is removed.
var firstAddr []resolver.Address
if len(addrs) > 0 {
firstAddr = []resolver.Address{addrs[0]}
}
r.clientConn.UpdateState(resolver.State{Addresses: firstAddr})

// Call UpdateState again with the entire list of addrs in case we need them
// for failover.
Comment on lines -273 to -285
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do want to preserve the old behavior of killing streams (almost) every time server addresses are shuffled, we could add some logic here.

r.clientConn.UpdateState(resolver.State{Addresses: addrs})

r.addrs = addrs
}

func (r *serverResolver) Close() {
Expand Down
10 changes: 0 additions & 10 deletions agent/router/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,14 @@ import (
// ServerTracker is called when Router is notified of a server being added or
// removed.
type ServerTracker interface {
NewRebalancer(dc string) func()
AddServer(types.AreaID, *metadata.Server)
RemoveServer(types.AreaID, *metadata.Server)
}

// Rebalancer is called periodically to re-order the servers so that the load on the
// servers is evenly balanced.
type Rebalancer func()

// NoOpServerTracker is a ServerTracker that does nothing. Used when gRPC is not
// enabled.
type NoOpServerTracker struct{}

// Rebalance does nothing
func (NoOpServerTracker) NewRebalancer(string) func() {
return func() {}
}

// AddServer does nothing
func (NoOpServerTracker) AddServer(types.AreaID, *metadata.Server) {}

Expand Down
6 changes: 1 addition & 5 deletions agent/router/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ type Manager struct {
// client.ConnPool.
connPoolPinger Pinger

rebalancer Rebalancer

// serverName has the name of the managers's server. This is used to
// short-circuit pinging to itself.
serverName string
Expand Down Expand Up @@ -235,7 +233,7 @@ func (m *Manager) saveServerList(l serverList) {
}

// New is the only way to safely create a new Manager struct.
func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string, rb Rebalancer) (m *Manager) {
func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string) (m *Manager) {
if logger == nil {
logger = hclog.New(&hclog.LoggerOptions{})
}
Expand All @@ -246,7 +244,6 @@ func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfC
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
m.rebalanceTimer = time.NewTimer(delayer.MinDelay)
m.shutdownCh = shutdownCh
m.rebalancer = rb
m.serverName = serverName
atomic.StoreInt32(&m.offline, 1)

Expand Down Expand Up @@ -481,7 +478,6 @@ func (m *Manager) Run() {
for {
select {
case <-m.rebalanceTimer.C:
m.rebalancer()
m.RebalanceServers()
delay := delayer.Delay(len(m.getServerList().servers), m.clusterInfo.NumNodes())
m.rebalanceTimer.Reset(delay)
Expand Down
6 changes: 2 additions & 4 deletions agent/router/manager_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ func (s *fauxSerf) NumNodes() int {
func testManager() (m *Manager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}, "", noopRebalancer)
m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}, "")
return m
}

func noopRebalancer() {}

func testManagerFailProb(failPct float64) (m *Manager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "", noopRebalancer)
m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "")
return m
}

Expand Down
8 changes: 4 additions & 4 deletions agent/router/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *fauxSerf) NumNodes() int {
func testManager(t testing.TB) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "", noopRebalancer)
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "")
return m
}

Expand All @@ -66,14 +66,14 @@ func noopRebalancer() {}
func testManagerFailProb(t testing.TB, failPct float64) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "", noopRebalancer)
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "")
return m
}

func testManagerFailAddr(t testing.TB, failAddr net.Addr) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failAddr: failAddr}, "", noopRebalancer)
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failAddr: failAddr}, "")
return m
}

Expand Down Expand Up @@ -197,7 +197,7 @@ func TestServers_FindServer(t *testing.T) {
func TestServers_New(t *testing.T) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "", noopRebalancer)
m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "")
if m == nil {
t.Fatalf("Manager nil")
}
Expand Down
3 changes: 1 addition & 2 deletions agent/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
}

shutdownCh := make(chan struct{})
rb := r.grpcServerTracker.NewRebalancer(dc)
manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName, rb)
manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName)
info = &managerInfo{
manager: manager,
shutdownCh: shutdownCh,
Expand Down