Skip to content

Commit

Permalink
Limit GetMetricStatistics to 10 per second
Browse files Browse the repository at this point in the history
closes #1197
  • Loading branch information
sparrc committed May 31, 2016
1 parent 069764f commit 8877318
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ time before a new metric is included by the plugin.
- [#1275](https://github.com/influxdata/telegraf/pull/1275): Allow wildcard filtering of varnish stats.
- [#1142](https://github.com/influxdata/telegraf/pull/1142): Support for glob patterns in exec plugin commands configuration.
- [#1278](https://github.com/influxdata/telegraf/pull/1278): RabbitMQ input: made url parameter optional by using DefaultURL (http://localhost:15672) if not specified
- [#1197](https://github.com/influxdata/telegraf/pull/1197): Limit AWS GetMetricStatistics requests to 10 per second.

### Bugfixes

Expand Down
59 changes: 59 additions & 0 deletions internal/limiter/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package limiter

import (
"sync"
"time"
)

// NewRateLimiter returns a rate limiter that will will emit from the C
// channel only 'n' times every 'rate' seconds.
func NewRateLimiter(n int, rate time.Duration) *rateLimiter {
r := &rateLimiter{
C: make(chan bool),
rate: rate,
n: n,
shutdown: make(chan bool),
}
r.wg.Add(1)
go r.limiter()
return r
}

type rateLimiter struct {
C chan bool
rate time.Duration
n int

shutdown chan bool
wg sync.WaitGroup
}

func (r *rateLimiter) Stop() {
close(r.shutdown)
r.wg.Wait()
close(r.C)
}

func (r *rateLimiter) limiter() {
defer r.wg.Done()
ticker := time.NewTicker(r.rate)
defer ticker.Stop()
counter := 0
for {
select {
case <-r.shutdown:
return
case <-ticker.C:
counter = 0
default:
if counter < r.n {
select {
case r.C <- true:
counter++
case <-r.shutdown:
return
}
}
}
}
}
54 changes: 54 additions & 0 deletions internal/limiter/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package limiter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestRateLimiter(t *testing.T) {
r := NewRateLimiter(5, time.Second)
ticker := time.NewTicker(time.Millisecond * 75)

// test that we can only get 5 receives from the rate limiter
counter := 0
outer:
for {
select {
case <-r.C:
counter++
case <-ticker.C:
break outer
}
}

assert.Equal(t, 5, counter)
r.Stop()
// verify that the Stop function closes the channel.
_, ok := <-r.C
assert.False(t, ok)
}

func TestRateLimiterMultipleIterations(t *testing.T) {
r := NewRateLimiter(5, time.Millisecond*50)
ticker := time.NewTicker(time.Millisecond * 250)

// test that we can get 15 receives from the rate limiter
counter := 0
outer:
for {
select {
case <-ticker.C:
break outer
case <-r.C:
counter++
}
}

assert.True(t, counter > 10)
r.Stop()
// verify that the Stop function closes the channel.
_, ok := <-r.C
assert.False(t, ok)
}
20 changes: 13 additions & 7 deletions plugins/inputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/plugins/inputs"
)

Expand Down Expand Up @@ -170,11 +171,13 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
now := time.Now()

// limit concurrency or we can easily exhaust user connection limit
semaphore := make(chan byte, 64)

// see cloudwatch API request limits:
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
lmtr := limiter.NewRateLimiter(10, time.Second)
defer lmtr.Stop()
for _, m := range metrics {
semaphore <- 0x1
go c.gatherMetric(acc, m, now, semaphore, errChan)
<-lmtr.C
go c.gatherMetric(acc, m, now, errChan)
}

for i := 1; i <= metricCount; i++ {
Expand Down Expand Up @@ -257,12 +260,16 @@ func (c *CloudWatch) fetchNamespaceMetrics() (metrics []*cloudwatch.Metric, err
/*
* Gather given Metric and emit any error
*/
func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.Metric, now time.Time, semaphore chan byte, errChan chan error) {
func (c *CloudWatch) gatherMetric(
acc telegraf.Accumulator,
metric *cloudwatch.Metric,
now time.Time,
errChan chan error,
) {
params := c.getStatisticsInput(metric, now)
resp, err := c.client.GetMetricStatistics(params)
if err != nil {
errChan <- err
<-semaphore
return
}

Expand Down Expand Up @@ -299,7 +306,6 @@ func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.M
}

errChan <- nil
<-semaphore
}

/*
Expand Down

0 comments on commit 8877318

Please sign in to comment.