Skip to content

Commit

Permalink
Ratelimiter usage-counting bugfix: rejected reservations were not cou…
Browse files Browse the repository at this point in the history
…nted (cadence-workflow#6158)

After noticing that we had rejected requests but no rejected global-limit numbers, I noticed this flaw.
Basically it's just that not-allowed requests were not being counted in the fallback-limiter, so it only affected the global system.

Since "counted" and "fallback" have separate implementations, and reserve is easier to mess up (as demonstrated) but rather important because it's heavily used in the frontend per-domain requests: I've added a test to check all three things to make sure they track calls like they should.

As part of this, to share some types a bit more and to just normalize the tests a bit, I dropped the external-test-package part of the fallback-limiter test.  AFAICT it was working fine, but it's definitely a bit of an abnormality in this codebase, and I'm not confident that it works correctly with coverage systems (I just haven't checked).  Might as well simplify.

---

In terms of concrete changes, there are really only two here:
- fallback-limiter's Reserve should have called Used(false) and counted the rejection.
- "not idle" is now done at `Used(..)` time, not `Reserve()`

The rest is tests and minor refactoring to simplify it as much as possible.
  • Loading branch information
Groxx authored Jul 5, 2024
1 parent dc05a78 commit a6935a9
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,10 @@ func (c CountedLimiter) Wait(ctx context.Context) error {
}

func (c CountedLimiter) Reserve() clock.Reservation {
c.usage.idle.Store(0) // not idle regardless of the result

res := c.wrapped.Reserve()
if res.Allow() {
// may be used or canceled, return a wrapped version so it's tracked
// when we know which it is.
return countedReservation{
wrapped: res,
usage: c.usage,
}
return countedReservation{
wrapped: c.wrapped.Reserve(),
usage: c.usage,
}
// cannot be used, just count the rejection immediately
// and return the original so it's a bit cheaper
c.usage.rejected.Add(1)
return res
}

func (c CountedLimiter) Collect() UsageMetrics {
Expand All @@ -100,15 +89,30 @@ func (c countedReservation) Allow() bool {
}

func (c countedReservation) Used(wasUsed bool) {
if wasUsed {
c.usage.allowed.Add(1)
}
c.wrapped.Used(wasUsed)
c.usage.idle.Store(0)
if c.Allow() {
if wasUsed {
// only counts as allowed if used, else it is hopefully rolled back.
// this may or may not restore the token, but it does imply "this limiter did not limit the event".
c.usage.Count(true)
}

// else it was canceled, and not "used".
//
// currently these are not tracked because some other rejection will occur
// and be emitted in all our current uses, but with bad enough luck or
// latency before canceling it could lead to misleading metrics.
// else it was canceled, and not "used".
//
// currently these are not tracked because some other rejection will occur
// and be emitted in all our current uses, but with bad enough luck or
// latency before canceling it could lead to misleading metrics.
} else {
// these reservations cannot be waited on so they cannot become allowed,
// and they cannot be returned, so they are always rejected.
//
// specifically: it is likely that `wasUsed == Allow()`, so false cannot be
// trusted to mean "will not use for some other reason", and the underlying
// rate.Limiter did not change state anyway because it returned the
// pending-token before becoming a clock.Reservation.
c.usage.Count(false)
}
}

func (a *AtomicUsage) Count(allowed bool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ package internal

import (
"context"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
)

func TestUsage(t *testing.T) {
Expand Down Expand Up @@ -90,10 +92,87 @@ func TestUsage(t *testing.T) {

r = lim.Reserve()
assert.False(t, r.Allow(), "should not have a token available")
assert.Equal(t, UsageMetrics{0, 1, 0}, lim.Collect(), "not-allowed reservations immediately count rejection")
r.Used(false)
assert.Equal(t, UsageMetrics{0, 1, 0}, lim.Collect(), "not-allowed reservations count as rejection")
})
}

func TestRegression_ReserveCountsCorrectly(t *testing.T) {
run := func(t *testing.T, lim quotas.Limiter, advance func(time.Duration), collect func() UsageMetrics) {
allowed, returned, rejected := 0, 0, 0
for i := 0; ; i++ {
if rejected > 3 {
// normal exit: some rejects occurred.
break // just to get more than 1 to be more interesting
}
if i > 1_000 {
// infinite loop guard because it's a real mess to debug
t.Error("too many attempts, test is not sane. allowed:", allowed, "rejected:", rejected, "returned:", returned)
break
}

r := lim.Reserve()

if rand.Intn(2) == 0 {
// time advancing before canceling should not affect this test because it is not concurrent,
// so only do it sometimes to make sure that's true
advance(time.Millisecond)
}

if r.Allow() {
if i%2 == 0 {
allowed++
r.Used(true)
} else {
returned++
r.Used(false)
}
} else {
rejected++
// try with both true and false.
// expected use is to call with false on all rejects, but it should not be required
r.Used(i%2 == 0)
}
}
usage := collect()
t.Logf("usage: %#v", usage)
assert.NotZero(t, allowed, "should have allowed some requests")
assert.Equal(t, allowed, usage.Allowed, "wrong num of requests allowed")
assert.Equal(t, rejected, usage.Rejected, "wrong num of requests rejected")
assert.Equal(t, 0, usage.Idle, "limiter should never be idle in this test")
}

t.Run("counted", func(t *testing.T) {
// "base" counting-limiter should count correctly
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
lim := NewCountedLimiter(wrapped)

run(t, lim, ts.Advance, lim.Collect)
})
t.Run("shadowed", func(t *testing.T) {
// "shadowed" should call the primary correctly at the very least
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
counted := NewCountedLimiter(wrapped)
lim := NewShadowedLimiter(counted, allowlimiter{})

run(t, lim, ts.Advance, counted.Collect)
})
t.Run("fallback", func(t *testing.T) {
// "fallback" uses a different implementation, but it should count exactly the same.
// TODO: ideally it would actually be the same code, but that's a bit awkward due to needing different interfaces.
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
l := NewFallbackLimiter(allowlimiter{})
l.Update(1) // allows using primary, else it calls the fallback
l.primary = wrapped // cheat, just swap it out

run(t, l, ts.Advance, func() UsageMetrics {
u, _, _ := l.Collect()
return u
})
})
}

// Wait-based tests can block forever if there's an issue, better to fail fast.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,8 @@ func (b *FallbackLimiter) Wait(ctx context.Context) error {
}

func (b *FallbackLimiter) Reserve() clock.Reservation {
res := b.both().Reserve()
return countedReservation{
wrapped: res,
wrapped: b.both().Reserve(),
usage: &b.usage,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package internal_test
package internal

import (
"context"
Expand All @@ -35,27 +35,26 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/quotas/global/collection/internal"
)

func TestLimiter(t *testing.T) {
t.Run("uses fallback initially", func(t *testing.T) {
m := quotas.NewMockLimiter(gomock.NewController(t))
m.EXPECT().Allow().Times(1).Return(true)
m.EXPECT().Allow().Times(2).Return(false)
lim := internal.NewFallbackLimiter(m)
lim := NewFallbackLimiter(m)

assert.True(t, lim.Allow(), "should return fallback's first response")
assert.False(t, lim.Allow(), "should return fallback's second response")
assert.False(t, lim.Allow(), "should return fallback's third response")

usage, starting, failing := lim.Collect()
assert.Equal(t, internal.UsageMetrics{1, 2, 0}, usage, "usage metrics should match returned values")
assert.Equal(t, UsageMetrics{1, 2, 0}, usage, "usage metrics should match returned values")
assert.True(t, starting, "should still be starting up")
assert.False(t, failing, "should not be failing, still starting up")
})
t.Run("uses primary after update", func(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{})
lim := NewFallbackLimiter(allowlimiter{})
lim.Update(1_000_000) // large enough to allow millisecond sleeps to refill

time.Sleep(time.Millisecond) // allow some tokens to fill
Expand All @@ -65,11 +64,11 @@ func TestLimiter(t *testing.T) {
usage, startup, failing := lim.Collect()
assert.False(t, failing, "should not use fallback limiter after update")
assert.False(t, startup, "should not be starting up, has had an update")
assert.Equal(t, internal.UsageMetrics{2, 0, 0}, usage, "usage should match behavior")
assert.Equal(t, UsageMetrics{2, 0, 0}, usage, "usage should match behavior")
})

t.Run("collecting usage data resets counts", func(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{})
lim := NewFallbackLimiter(allowlimiter{})
lim.Update(1)
lim.Allow()
limit, _, _ := lim.Collect()
Expand All @@ -88,7 +87,7 @@ func TestLimiter(t *testing.T) {
})

t.Run("falls back after too many failures", func(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{}) // fallback behavior is ignored
lim := NewFallbackLimiter(allowlimiter{}) // fallback behavior is ignored
lim.Update(1)
_, startup, failing := lim.Collect()
require.False(t, failing, "should not be using fallback")
Expand All @@ -112,7 +111,7 @@ func TestLimiter(t *testing.T) {
assert.True(t, lim.Allow(), "should return fallback's allowed request")
})
t.Run("failing many times does not accidentally switch away from startup mode", func(t *testing.T) {
lim := internal.NewFallbackLimiter(nil)
lim := NewFallbackLimiter(nil)
for i := 0; i < maxFailedUpdates*10; i++ {
lim.FailedUpdate()
_, startup, failing := lim.Collect()
Expand All @@ -124,14 +123,14 @@ func TestLimiter(t *testing.T) {

t.Run("coverage", func(t *testing.T) {
// easy line to cover to bring to 100%
lim := internal.NewFallbackLimiter(nil)
lim := NewFallbackLimiter(nil)
lim.Update(1)
lim.Update(1) // should go down "no changes needed, return early" path
})
}

func TestLimiterNotRacy(t *testing.T) {
lim := internal.NewFallbackLimiter(allowlimiter{})
lim := NewFallbackLimiter(allowlimiter{})
var g errgroup.Group
const loops = 1000
for i := 0; i < loops; i++ {
Expand Down

0 comments on commit a6935a9

Please sign in to comment.