Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RPC limits reloadable #4216

Merged
merged 7 commits into from
Jun 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type delegate interface {
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error
Stats() map[string]map[string]string
ReloadConfig(config *consul.Config) error
enterpriseDelegate
}

Expand Down Expand Up @@ -2491,6 +2492,11 @@ func (a *Agent) DisableNodeMaintenance() {
a.logger.Printf("[INFO] agent: Node left maintenance mode")
}

func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
a.config.RPCRateLimit = conf.RPCRateLimit
a.config.RPCMaxBurst = conf.RPCMaxBurst
}

func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
// Bulk update the services and checks
a.PauseSync()
Expand Down Expand Up @@ -2525,6 +2531,18 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
return fmt.Errorf("Failed reloading watches: %v", err)
}

a.loadLimits(newCfg)

// create the config for the rpc server/client
consulCfg, err := a.consulConfig()
if err != nil {
return err
}

if err := a.delegate.ReloadConfig(consulCfg); err != nil {
return err
}

// Update filtered metrics
metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes)

Expand Down
16 changes: 16 additions & 0 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ func TestAgent_Reload(t *testing.T) {
handler = "true"
}
]
limits = {
rpc_rate=1
rpc_max_burst=100
}
`)
defer a.Shutdown()

Expand All @@ -302,6 +306,10 @@ func TestAgent_Reload(t *testing.T) {
name = "redis-reloaded"
}
]
limits = {
rpc_rate=2
rpc_max_burst=200
}
`,
})

Expand All @@ -312,6 +320,14 @@ func TestAgent_Reload(t *testing.T) {
t.Fatal("missing redis-reloaded service")
}

if a.config.RPCRateLimit != 2 {
t.Fatalf("RPC rate not set correctly. Got %v. Want 2", a.config.RPCRateLimit)
}

if a.config.RPCMaxBurst != 200 {
t.Fatalf("RPC max burst not set correctly. Got %v. Want 200", a.config.RPCMaxBurst)
}

for _, wp := range a.watchPlans {
if !wp.IsStopped() {
t.Fatalf("Reloading configs should stop watch plans of the previous configuration")
Expand Down
17 changes: 13 additions & 4 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -56,7 +57,7 @@ type Client struct {

// rpcLimiter is used to rate limit the total number of RPCs initiated
// from an agent.
rpcLimiter *rate.Limiter
rpcLimiter atomic.Value

// eventCh is used to receive events from the
// serf cluster in the datacenter
Expand Down Expand Up @@ -128,12 +129,13 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
c := &Client{
config: config,
connPool: connPool,
rpcLimiter: rate.NewLimiter(config.RPCRate, config.RPCMaxBurst),
eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger,
shutdownCh: make(chan struct{}),
}

c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))

if err := c.initEnterprise(); err != nil {
c.Shutdown()
return nil, err
Expand Down Expand Up @@ -263,7 +265,7 @@ TRY:

// Enforce the RPC limit.
metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Allow() {
if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}
Expand Down Expand Up @@ -305,7 +307,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io

// Enforce the RPC limit.
metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Allow() {
if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}
Expand Down Expand Up @@ -380,3 +382,10 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
cs := lib.CoordinateSet{c.config.Segment: lan}
return cs, nil
}

// ReloadConfig is used to have the Client do an online reload of
// relevant configuration information
func (c *Client) ReloadConfig(config *Config) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
return nil
}
24 changes: 24 additions & 0 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

func testClientConfig(t *testing.T) (string, *Config) {
Expand Down Expand Up @@ -665,3 +667,25 @@ func TestClient_Encrypted(t *testing.T) {
t.Fatalf("should be encrypted")
}
}

func TestClient_Reload(t *testing.T) {
t.Parallel()
dir1, c := testClientWithConfig(t, func(c *Config) {
c.RPCRate = 500
c.RPCMaxBurst = 5000
})
defer os.RemoveAll(dir1)
defer c.Shutdown()

limiter := c.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(500), limiter.Limit())
require.Equal(t, 5000, limiter.Burst())

c.config.RPCRate = 1000
c.config.RPCMaxBurst = 10000

require.NoError(t, c.ReloadConfig(c.config))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all looks good to me but I wonder if we should have 3 different goroutines all spawn and run this in a loop and verify that go test -race doesn't pick up any concurrency issues?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReloadConfig can't be called concurrently. At least as the code stands today.

Client.ReloadConfig is only called from Agent.ReloadConfig which in turn is only called from cmd.handleReload (command/agent/agent.go) and finally handleReload is only called from a loop in cmd.run which is processing the chan of signals. As it can only handle 1 signal at a time, ReloadConfig can therefore not be called concurrently so there can be no race. I have no doubt that if it was called concurrently there would be several problems (like watches, services etc could be loaded multiple times)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh OK. Maybe I confused the calls, I was mostly wanting to validate the atomic.Value stuff - I guess that is about calling this concurrently with RPC client reading the values so perhaps the valid test would be to start a goroutine (or 10) making RPCs while calling this in a loop or something.

It's not a blocker, it just might be an extra sanity check that there aren't any obvious concurrency issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would that really test? To me it looks like that would be testing that atomc.Value.(Load/Store) do what they say they do. And since that is an upstream library (and builtin to the language) I would say that testing that is outside the scope of Consul testing and would only serve to lengthen test times. We should be able to assume that those atomic primitives work as expected. Because the whole limiter is now stored within the value it is impossible to access it without a load/store and since we don't do any write operations into an existing limiter during reload (but rather create a new limiter) there can be no race. Additionally the limiter looks to be thread safe in that if you did set new limits or perform other operations it will lock itself internally (which is how we can use one limiter with multiple go routines). Looks to be = I read through the code. I don't think they expressly say in the package documentation that this is the case.

Copy link
Member

@banks banks Jun 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to assume that those atomic primitives work as expected.

It's often the case that atomic values are misused. My suggestion was to guard against any of the common pitfalls of using them!

I don't think it's critical and agree the usage looks simple and correct so merge away but in general go's race detector is easy to use and can often catch subtle and common mistakes/bad assumptions in how sync primitives are used. 😄 .

limiter = c.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(1000), limiter.Limit())
require.Equal(t, 10000, limiter.Burst())
}
6 changes: 6 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,12 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
return cs, nil
}

// ReloadConfig is used to have the Server do an online reload of
// relevant configuration information
func (s *Server) ReloadConfig(config *Config) error {
return nil
}

// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
Expand Down
1 change: 1 addition & 0 deletions website/source/docs/agent/options.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -1296,3 +1296,4 @@ items which are reloaded include:
* <a href="#node_meta">Node Metadata</a>
* <a href="#telemetry-prefix_filter">Metric Prefix Filter</a>
* <a href="#discard_check_output">Discard Check Output</a>
* <a href="#limits">RPC rate limiting</a>