From a1b0dc245078f6e51588963ca69f220bc4b432d7 Mon Sep 17 00:00:00 2001 From: kisunji Date: Wed, 4 May 2022 12:47:39 -0400 Subject: [PATCH 1/4] Use native grpc round robin balancer --- agent/grpc/private/client.go | 5 ++-- agent/grpc/private/client_test.go | 10 ------- agent/grpc/private/resolver/resolver.go | 40 ------------------------- agent/router/grpc.go | 10 ------- agent/router/manager.go | 6 +--- agent/router/router.go | 6 +--- 6 files changed, 5 insertions(+), 72 deletions(-) diff --git a/agent/grpc/private/client.go b/agent/grpc/private/client.go index 8d10edd17501..2fc38c2759a8 100644 --- a/agent/grpc/private/client.go +++ b/agent/grpc/private/client.go @@ -8,6 +8,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/keepalive" "github.com/hashicorp/consul/agent/metadata" @@ -130,8 +131,8 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien grpc.WithContextDialer(c.dialer), grpc.WithDisableRetry(), grpc.WithStatsHandler(newStatsHandler(defaultMetrics())), - // nolint:staticcheck // there is no other supported alternative to WithBalancerName - grpc.WithBalancerName("pick_first"), + // nolint:staticcheck + grpc.WithBalancerName(roundrobin.Name), // Keep alive parameters are based on the same default ones we used for // Yamux. These are somewhat arbitrary but we did observe in scale testing // that the gRPC defaults (servers send keepalives only every 2 hours, diff --git a/agent/grpc/private/client_test.go b/agent/grpc/private/client_test.go index d414207abe17..0542bd7ecf43 100644 --- a/agent/grpc/private/client_test.go +++ b/agent/grpc/private/client_test.go @@ -373,21 +373,11 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) { first, err := client.Something(ctx, &testservice.Req{}) require.NoError(t, err) - t.Run("rebalance a different DC, does nothing", func(t *testing.T) { - res.NewRebalancer("dc-other")() - - resp, err := client.Something(ctx, &testservice.Req{}) - require.NoError(t, err) - require.Equal(t, resp.ServerName, first.ServerName) - }) - t.Run("rebalance the dc", func(t *testing.T) { // Rebalance is random, but if we repeat it a few times it should give us a // new server. attempts := 100 for i := 0; i < attempts; i++ { - res.NewRebalancer("dc1")() - resp, err := client.Something(ctx, &testservice.Req{}) require.NoError(t, err) if resp.ServerName != first.ServerName { diff --git a/agent/grpc/private/resolver/resolver.go b/agent/grpc/private/resolver/resolver.go index c0c3b8938abb..8b89d470a212 100644 --- a/agent/grpc/private/resolver/resolver.go +++ b/agent/grpc/private/resolver/resolver.go @@ -2,10 +2,8 @@ package resolver import ( "fmt" - "math/rand" "strings" "sync" - "time" "google.golang.org/grpc/resolver" @@ -43,31 +41,6 @@ func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder { } } -// NewRebalancer returns a function which shuffles the server list for resolvers -// in all datacenters. -func (s *ServerResolverBuilder) NewRebalancer(dc string) func() { - shuffler := rand.New(rand.NewSource(time.Now().UnixNano())) - return func() { - s.lock.RLock() - defer s.lock.RUnlock() - - for _, resolver := range s.resolvers { - if resolver.datacenter != dc { - continue - } - // Shuffle the list of addresses using the last list given to the resolver. - resolver.addrLock.Lock() - addrs := resolver.addrs - shuffler.Shuffle(len(addrs), func(i, j int) { - addrs[i], addrs[j] = addrs[j], addrs[i] - }) - // Pass the shuffled list to the resolver. - resolver.updateAddrsLocked(addrs) - resolver.addrLock.Unlock() - } - } -} - // ServerForGlobalAddr returns server metadata for a server with the specified globally unique address. func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadata.Server, error) { s.lock.RLock() @@ -270,19 +243,6 @@ func (r *serverResolver) updateAddrs(addrs []resolver.Address) { // updateAddrsLocked updates this serverResolver's ClientConn to use the given // set of addrs. addrLock must be held by caller. func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) { - // Only pass the first address initially, which will cause the - // balancer to spin down the connection for its previous first address - // if it is different. If we don't do this, it will keep using the old - // first address as long as it is still in the list, making it impossible to - // rebalance until that address is removed. - var firstAddr []resolver.Address - if len(addrs) > 0 { - firstAddr = []resolver.Address{addrs[0]} - } - r.clientConn.UpdateState(resolver.State{Addresses: firstAddr}) - - // Call UpdateState again with the entire list of addrs in case we need them - // for failover. r.clientConn.UpdateState(resolver.State{Addresses: addrs}) r.addrs = addrs diff --git a/agent/router/grpc.go b/agent/router/grpc.go index 44600d42ad1d..b18835398508 100644 --- a/agent/router/grpc.go +++ b/agent/router/grpc.go @@ -8,24 +8,14 @@ import ( // ServerTracker is called when Router is notified of a server being added or // removed. type ServerTracker interface { - NewRebalancer(dc string) func() AddServer(types.AreaID, *metadata.Server) RemoveServer(types.AreaID, *metadata.Server) } -// Rebalancer is called periodically to re-order the servers so that the load on the -// servers is evenly balanced. -type Rebalancer func() - // NoOpServerTracker is a ServerTracker that does nothing. Used when gRPC is not // enabled. type NoOpServerTracker struct{} -// Rebalance does nothing -func (NoOpServerTracker) NewRebalancer(string) func() { - return func() {} -} - // AddServer does nothing func (NoOpServerTracker) AddServer(types.AreaID, *metadata.Server) {} diff --git a/agent/router/manager.go b/agent/router/manager.go index 8d2e9658eea8..ba5f4b276413 100644 --- a/agent/router/manager.go +++ b/agent/router/manager.go @@ -64,8 +64,6 @@ type Manager struct { // client.ConnPool. connPoolPinger Pinger - rebalancer Rebalancer - // serverName has the name of the managers's server. This is used to // short-circuit pinging to itself. serverName string @@ -235,7 +233,7 @@ func (m *Manager) saveServerList(l serverList) { } // New is the only way to safely create a new Manager struct. -func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string, rb Rebalancer) (m *Manager) { +func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string) (m *Manager) { if logger == nil { logger = hclog.New(&hclog.LoggerOptions{}) } @@ -246,7 +244,6 @@ func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfC m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle m.rebalanceTimer = time.NewTimer(delayer.MinDelay) m.shutdownCh = shutdownCh - m.rebalancer = rb m.serverName = serverName atomic.StoreInt32(&m.offline, 1) @@ -481,7 +478,6 @@ func (m *Manager) Run() { for { select { case <-m.rebalanceTimer.C: - m.rebalancer() m.RebalanceServers() delay := delayer.Delay(len(m.getServerList().servers), m.clusterInfo.NumNodes()) m.rebalanceTimer.Reset(delay) diff --git a/agent/router/router.go b/agent/router/router.go index 1389a30f6ce7..30910d57481f 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -96,9 +96,6 @@ func NewRouter(logger hclog.Logger, localDatacenter, serverName string, tracker if logger == nil { logger = hclog.New(&hclog.LoggerOptions{}) } - if tracker == nil { - tracker = NoOpServerTracker{} - } router := &Router{ logger: logger.Named(logging.Router), @@ -260,8 +257,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager { } shutdownCh := make(chan struct{}) - rb := r.grpcServerTracker.NewRebalancer(dc) - manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName, rb) + manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName) info = &managerInfo{ manager: manager, shutdownCh: shutdownCh, From 16b36d897ac313cd18ec22de1209b77801b614d0 Mon Sep 17 00:00:00 2001 From: kisunji Date: Wed, 4 May 2022 13:55:54 -0400 Subject: [PATCH 2/4] Fix tests --- agent/router/manager_internal_test.go | 6 ++---- agent/router/manager_test.go | 8 ++++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/agent/router/manager_internal_test.go b/agent/router/manager_internal_test.go index 640e96a06234..0b1fa17b1e83 100644 --- a/agent/router/manager_internal_test.go +++ b/agent/router/manager_internal_test.go @@ -53,16 +53,14 @@ func (s *fauxSerf) NumNodes() int { func testManager() (m *Manager) { logger := GetBufferedLogger() shutdownCh := make(chan struct{}) - m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}, "", noopRebalancer) + m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}, "") return m } -func noopRebalancer() {} - func testManagerFailProb(failPct float64) (m *Manager) { logger := GetBufferedLogger() shutdownCh := make(chan struct{}) - m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "", noopRebalancer) + m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "") return m } diff --git a/agent/router/manager_test.go b/agent/router/manager_test.go index e21ab340f40b..08bf059be9d1 100644 --- a/agent/router/manager_test.go +++ b/agent/router/manager_test.go @@ -57,7 +57,7 @@ func (s *fauxSerf) NumNodes() int { func testManager(t testing.TB) (m *router.Manager) { logger := testutil.Logger(t) shutdownCh := make(chan struct{}) - m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "", noopRebalancer) + m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "") return m } @@ -66,14 +66,14 @@ func noopRebalancer() {} func testManagerFailProb(t testing.TB, failPct float64) (m *router.Manager) { logger := testutil.Logger(t) shutdownCh := make(chan struct{}) - m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "", noopRebalancer) + m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "") return m } func testManagerFailAddr(t testing.TB, failAddr net.Addr) (m *router.Manager) { logger := testutil.Logger(t) shutdownCh := make(chan struct{}) - m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failAddr: failAddr}, "", noopRebalancer) + m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failAddr: failAddr}, "") return m } @@ -197,7 +197,7 @@ func TestServers_FindServer(t *testing.T) { func TestServers_New(t *testing.T) { logger := testutil.Logger(t) shutdownCh := make(chan struct{}) - m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "", noopRebalancer) + m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "") if m == nil { t.Fatalf("Manager nil") } From e6fc5be4287d4a98f71fa33b23b016f08973aab0 Mon Sep 17 00:00:00 2001 From: kisunji Date: Tue, 17 May 2022 10:35:20 -0400 Subject: [PATCH 3/4] Re-add validation --- agent/router/router.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/agent/router/router.go b/agent/router/router.go index 30910d57481f..0f5845b30d6a 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -96,6 +96,9 @@ func NewRouter(logger hclog.Logger, localDatacenter, serverName string, tracker if logger == nil { logger = hclog.New(&hclog.LoggerOptions{}) } + if tracker == nil { + tracker = NoOpServerTracker{} + } router := &Router{ logger: logger.Named(logging.Router), From 383e150e3e9af33926c622be38187fbd7a854805 Mon Sep 17 00:00:00 2001 From: kisunji Date: Tue, 17 May 2022 12:26:47 -0400 Subject: [PATCH 4/4] Fix test flakiness --- agent/consul/subscribe_backend_test.go | 10 +++++----- agent/grpc/private/resolver/resolver.go | 2 -- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/agent/consul/subscribe_backend_test.go b/agent/consul/subscribe_backend_test.go index 7a22eace5e49..1692d9312df9 100644 --- a/agent/consul/subscribe_backend_test.go +++ b/agent/consul/subscribe_backend_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" ) @@ -204,11 +205,10 @@ func TestSubscribeBackend_IntegrationWithServer_TLSReload(t *testing.T) { server.tlsConfigurator.Update(newConf) // Try the subscribe call again - retryFailedConn(t, conn) - - streamClient = pbsubscribe.NewStateChangeSubscriptionClient(conn) - _, err = streamClient.Subscribe(ctx, req) - require.NoError(t, err) + retry.Run(t, func(r *retry.R) { + _, err = streamClient.Subscribe(ctx, req) + require.NoError(r, err) + }) } func clientConfigVerifyOutgoing(config *Config) { diff --git a/agent/grpc/private/resolver/resolver.go b/agent/grpc/private/resolver/resolver.go index 8b89d470a212..0442e4b7bc4b 100644 --- a/agent/grpc/private/resolver/resolver.go +++ b/agent/grpc/private/resolver/resolver.go @@ -244,8 +244,6 @@ func (r *serverResolver) updateAddrs(addrs []resolver.Address) { // set of addrs. addrLock must be held by caller. func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) { r.clientConn.UpdateState(resolver.State{Addresses: addrs}) - - r.addrs = addrs } func (r *serverResolver) Close() {