From 255492bb2d80bfaeaa68a7c93aae37c47f1f79d8 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Sun, 8 Apr 2018 03:57:01 -0700 Subject: [PATCH 1/6] add unit tests: limits configuration should be reloadable --- agent/agent_endpoint_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 940304b54892..f4a800b81dbd 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -281,6 +281,10 @@ func TestAgent_Reload(t *testing.T) { handler = "true" } ] + limits = { + rpc_rate=1 + rpc_max_burst=100 + } `) defer a.Shutdown() @@ -302,6 +306,10 @@ func TestAgent_Reload(t *testing.T) { name = "redis-reloaded" } ] + limits = { + rpc_rate=2 + rpc_max_burst=200 + } `, }) @@ -312,6 +320,14 @@ func TestAgent_Reload(t *testing.T) { t.Fatal("missing redis-reloaded service") } + if a.config.RPCRateLimit != 2 { + t.Fatalf("RPC rate not set correctly. Got %v. Want 2", a.config.RPCRateLimit) + } + + if a.config.RPCMaxBurst != 200 { + t.Fatalf("RPC max burst not set correctly. Got %v. Want 200", a.config.RPCMaxBurst) + } + for _, wp := range a.watchPlans { if !wp.IsStopped() { t.Fatalf("Reloading configs should stop watch plans of the previous configuration") From 672a2a3577278bf24924911136add6a385923bbd Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Sun, 8 Apr 2018 14:28:29 -0700 Subject: [PATCH 2/6] agent: reload limits upon restart --- agent/agent.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/agent/agent.go b/agent/agent.go index 800302c1aed6..88dbf8d99008 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2479,6 +2479,11 @@ func (a *Agent) DisableNodeMaintenance() { a.logger.Printf("[INFO] agent: Node left maintenance mode") } +func (a *Agent) loadLimits(conf *config.RuntimeConfig) { + a.config.RPCRateLimit = conf.RPCRateLimit + a.config.RPCMaxBurst = conf.RPCMaxBurst +} + func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { // Bulk update the services and checks a.PauseSync() @@ -2513,6 +2518,8 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { return fmt.Errorf("Failed reloading watches: %v", err) } + a.loadLimits(newCfg) + // Update filtered metrics metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes) From 65746b2f8fa726acb1774b1849e04d9d4dba4c9d Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Mon, 11 Jun 2018 15:51:17 -0400 Subject: [PATCH 3/6] Apply the limits to the clients rpcLimiter --- agent/agent.go | 17 ++++++++++++++--- agent/consul/client.go | 17 +++++++++++++---- agent/consul/server.go | 6 ++++++ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 88dbf8d99008..caf515084452 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -73,6 +73,7 @@ type delegate interface { SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error Shutdown() error Stats() map[string]map[string]string + ReloadConfig(config *consul.Config) error } // notifier is called after a successful JoinLAN. @@ -2480,8 +2481,8 @@ func (a *Agent) DisableNodeMaintenance() { } func (a *Agent) loadLimits(conf *config.RuntimeConfig) { - a.config.RPCRateLimit = conf.RPCRateLimit - a.config.RPCMaxBurst = conf.RPCMaxBurst + a.config.RPCRateLimit = conf.RPCRateLimit + a.config.RPCMaxBurst = conf.RPCMaxBurst } func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { @@ -2518,7 +2519,17 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { return fmt.Errorf("Failed reloading watches: %v", err) } - a.loadLimits(newCfg) + a.loadLimits(newCfg) + + // create the config for the rpc server/client + consulCfg, err := a.consulConfig() + if err != nil { + return err + } + + if err := a.delegate.ReloadConfig(consulCfg); err != nil { + return err + } // Update filtered metrics metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes) diff --git a/agent/consul/client.go b/agent/consul/client.go index 96baeb174802..84e1a7319997 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -56,7 +57,7 @@ type Client struct { // rpcLimiter is used to rate limit the total number of RPCs initiated // from an agent. - rpcLimiter *rate.Limiter + rpcLimiter atomic.Value // eventCh is used to receive events from the // serf cluster in the datacenter @@ -125,12 +126,13 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) { c := &Client{ config: config, connPool: connPool, - rpcLimiter: rate.NewLimiter(config.RPCRate, config.RPCMaxBurst), eventCh: make(chan serf.Event, serfEventBacklog), logger: logger, shutdownCh: make(chan struct{}), } + c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) + // Initialize the LAN Serf c.serf, err = c.setupSerf(config.SerfLANConfig, c.eventCh, serfLANSnapshot) @@ -251,7 +253,7 @@ TRY: // Enforce the RPC limit. metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1) metrics.IncrCounter([]string{"client", "rpc"}, 1) - if !c.rpcLimiter.Allow() { + if !c.rpcLimiter.Load().(*rate.Limiter).Allow() { metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1) metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1) return structs.ErrRPCRateExceeded @@ -295,7 +297,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io // Enforce the RPC limit. metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1) metrics.IncrCounter([]string{"client", "rpc"}, 1) - if !c.rpcLimiter.Allow() { + if !c.rpcLimiter.Load().(*rate.Limiter).Allow() { metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1) metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1) return structs.ErrRPCRateExceeded @@ -360,3 +362,10 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) { cs := lib.CoordinateSet{c.config.Segment: lan} return cs, nil } + +// ReloadConfig is used to have the Client do an online reload of +// relevant configuration information +func (c *Client) ReloadConfig(config *Config) error { + c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) + return nil +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 128f67081c08..b1aca96a6212 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -1040,6 +1040,12 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) { return cs, nil } +// ReloadConfig is used to have the Server do an online reload of +// relevant configuration information +func (s *Server) ReloadConfig(config *Config) error { + return nil +} + // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write func (s *Server) setConsistentReadReady() { atomic.StoreInt32(&s.readyForConsistentReads, 1) From 0f5798f028d8dfc27ece3c3eac3ac3bbd008a337 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Mon, 11 Jun 2018 15:54:55 -0400 Subject: [PATCH 4/6] Update docs about rpc limits being reloadable --- website/source/docs/agent/options.html.md | 1 + 1 file changed, 1 insertion(+) diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md index 0dcf866e2ec3..cc01aef4ad5a 100644 --- a/website/source/docs/agent/options.html.md +++ b/website/source/docs/agent/options.html.md @@ -1483,3 +1483,4 @@ items which are reloaded include: * Node Metadata * Metric Prefix Filter * Discard Check Output +* RPC rate limiting \ No newline at end of file From 0df7cd22aaf2f2e6e32bc66b53568efd6b2e41cd Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Mon, 11 Jun 2018 16:23:51 -0400 Subject: [PATCH 5/6] Add a Client ReloadConfig test --- agent/consul/client_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index 20647b3f6635..f61541b5efa5 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -15,6 +15,8 @@ import ( "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" ) func testClientConfig(t *testing.T) (string, *Config) { @@ -665,3 +667,25 @@ func TestClient_Encrypted(t *testing.T) { t.Fatalf("should be encrypted") } } + +func TestClient_Reload(t *testing.T) { + t.Parallel() + dir1, c := testClientWithConfig(t, func(c *Config) { + c.RPCRate = 500 + c.RPCMaxBurst = 5000 + }) + defer os.RemoveAll(dir1) + defer c.Shutdown() + + limiter := c.rpcLimiter.Load().(*rate.Limiter) + require.Equal(t, rate.Limit(500), limiter.Limit()) + require.Equal(t, 5000, limiter.Burst()) + + c.config.RPCRate = 1000 + c.config.RPCMaxBurst = 10000 + + require.NoError(t, c.ReloadConfig(c.config)) + limiter = c.rpcLimiter.Load().(*rate.Limiter) + require.Equal(t, rate.Limit(1000), limiter.Limit()) + require.Equal(t, 10000, limiter.Burst()) +} From 40e6d9c72064b29e196f4a9d96894d4eb299d1dc Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Mon, 11 Jun 2018 16:27:39 -0400 Subject: [PATCH 6/6] Fixup a weird merge problem --- agent/agent.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 6d9f80252bf4..cacd085216fb 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1848,11 +1848,6 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, check.CheckID, checks.MinInterval)) chkType.Interval = checks.MinInterval } - if chkType.Script != "" { - a.logger.Printf("[WARN] agent: check %q has the 'script' field, which has been deprecated "+ - "and replaced with the 'args' field. See https://www.consul.io/docs/agent/checks.html", - check.CheckID) - } if a.dockerClient == nil { dc, err := checks.NewDockerClient(os.Getenv("DOCKER_HOST"), checks.BufSize) @@ -1890,11 +1885,6 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, check.CheckID, checks.MinInterval) chkType.Interval = checks.MinInterval } - if chkType.Script != "" { - a.logger.Printf("[WARN] agent: check %q has the 'script' field, which has been deprecated "+ - "and replaced with the 'args' field. See https://www.consul.io/docs/agent/checks.html", - check.CheckID) - } monitor := &checks.CheckMonitor{ Notify: a.State,