Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple rate limiting for agent rpc calls. #3140

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,15 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.RPCAdvertise = base.RPCAddr
}

// Rate limiting for RPC calls
if a.config.RPCRate > 0 {
base.RPCRate = a.config.RPCRate
}

if a.config.RPCMaxBurst > 0 {
base.RPCMaxBurst = a.config.RPCMaxBurst
}

// set the src address for outgoing rpc connections
// Use port 0 so that outgoing connections use a random port.
if !ipaddr.IsAny(base.RPCAddr.IP) {
Expand Down
19 changes: 19 additions & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-sockaddr/template"
"github.com/mitchellh/mapstructure"
"golang.org/x/time/rate"
)

// Ports is used to simplify the configuration by
Expand Down Expand Up @@ -785,6 +786,14 @@ type Config struct {
SessionTTLMin time.Duration `mapstructure:"-"`
SessionTTLMinRaw string `mapstructure:"session_ttl_min"`

// Rate limiter controls how frequently rpc calls are allowed to happen.
// In any large enough time interval, rate limiter limits the rate to RPCRate tokens per second,
// with a maximum burst size of RPCMaxBurst events.
// As a special case, if RPCRate == Inf (the infinite rate), RPCMaxBurst is ignored.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
RPCRate rate.Limit `mapstructure:"rpc_rate"`
RPCMaxBurst int `mapstructure:"rpc_max_burst"`

// deprecated fields
// keep them exported since otherwise the error messages don't show up
DeprecatedAtlasInfrastructure string `mapstructure:"atlas_infrastructure" json:"-"`
Expand Down Expand Up @@ -972,6 +981,9 @@ func DefaultConfig() *Config {
RetryInterval: 30 * time.Second,
RetryIntervalWan: 30 * time.Second,

RPCRate: rate.Inf,
RPCMaxBurst: 1000,

TLSMinVersion: "tls10",

EncryptVerifyIncoming: Bool(true),
Expand Down Expand Up @@ -2048,6 +2060,13 @@ func MergeConfig(a, b *Config) *Config {
result.SessionTTLMinRaw = b.SessionTTLMinRaw
}

if b.RPCRate > 0 {
result.RPCRate = b.RPCRate
}
if b.RPCMaxBurst > 0 {
result.RPCMaxBurst = b.RPCMaxBurst
}

result.HTTPConfig.BlockEndpoints = append(a.HTTPConfig.BlockEndpoints,
b.HTTPConfig.BlockEndpoints...)
if len(b.HTTPConfig.ResponseHeaders) > 0 {
Expand Down
8 changes: 8 additions & 0 deletions agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,14 @@ func TestDecodeConfig(t *testing.T) {
in: `{"retry_max_wan":123}`,
c: &Config{RetryMaxAttemptsWan: 123},
},
{
in: `{"rpc_rate": 100}`,
c: &Config{RPCRate: 100},
},
{
in: `{"rpc_max_burst": 50}`,
c: &Config{RPCMaxBurst: 50},
},
{
in: `{"serf_lan_bind":"1.2.3.4"}`,
c: &Config{SerfLanBindAddr: "1.2.3.4"},
Expand Down
12 changes: 12 additions & 0 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -70,6 +72,8 @@ type Client struct {
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex

rpcLimiter *rate.Limiter
}

// NewClient is used to construct a new Consul client from the
Expand Down Expand Up @@ -341,6 +345,14 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
return structs.ErrNoServers
}

metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1)

// Check rate
if !c.rpcLimiter.Allow() {
metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}

// Forward to remote Consul
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil {
c.routers.NotifyFailedServer(server)
Expand Down
12 changes: 12 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -293,6 +294,14 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration

// Rate limiter controls how frequently rpc calls are allowed to happen.
// In any large enough time interval, rate limiter limits the rate to RPCRate tokens per second,
// with a maximum burst size of RPCMaxBurst events.
// As a special case, if RPCRate == Inf (the infinite rate), RPCMaxBurst is ignored.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
RPCRate rate.Limit
RPCMaxBurst int

// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *structs.AutopilotConfig
Expand Down Expand Up @@ -376,6 +385,9 @@ func DefaultConfig() *Config {
// than enough when running in the high performance mode.
RPCHoldTimeout: 7 * time.Second,

RPCRate: rate.Inf,
RPCMaxBurst: 1000,

TLSMinVersion: "tls10",

AutopilotConfig: &structs.AutopilotConfig{
Expand Down
1 change: 1 addition & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
ErrNoDCPath = fmt.Errorf("No path to datacenter")
ErrNoServers = fmt.Errorf("No known Consul servers")
ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads")
ErrRPCRateExceeded = fmt.Errorf("RPC rate limit exceeded")
)

type MessageType uint8
Expand Down
27 changes: 27 additions & 0 deletions vendor/golang.org/x/time/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/golang.org/x/time/PATENTS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading