Skip to content

Commit

Permalink
add cache throttler
Browse files Browse the repository at this point in the history
fix cache scale units
fix throttler tests
update doc
  • Loading branch information
1pkg committed Oct 19, 2020
1 parent a21a7b7 commit 772fc1b
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ thr := NewThrottlerAll( // throttles only if all children throttle
| not | `func NewThrottlerNot(thr Throttler) Throttler` | Throttles call if provided throttler doesn't throttle. |
| suppress | `func NewThrottlerSuppress(thr Throttler) Throttler` | Suppresses provided throttler to never throttle. |
| retry | `func NewThrottlerRetry(thr Throttler, retries uint64) Throttler` | Retries provided throttler error up until the provided retries threshold.<br> Internally retry uses square throttler with `DefaultRetriedDuration` initial duration. |
| cache | `func NewThrottlerCache(thr Throttler, cache time.Duration) Throttler` | Caches provided throttler calls for the provided cache duration, throttler release resulting resets cache.<br> Only non throttling calls are cached for the provided cache duration. |

## Integrations

Expand Down
2 changes: 2 additions & 0 deletions enqueuers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type enqrabbit struct {
// which enqueues provided message to the specified queue.
// New unique exchange `gohalt_exchange_{{uuid}}` is created for each new enqueuer,
// new unique message id `gohalt_enqueue_{{uuid}}` is created for each new message.
// Only successful connections are cached.
func NewEnqueuerRabbit(url string, queue string, retries uint64) Enqueuer {
exchange := fmt.Sprintf("gohalt_exchange_%s", uuid.NewV4())
enq := &enqrabbit{}
Expand Down Expand Up @@ -121,6 +122,7 @@ type enqkafka struct {
// with cached connection and failure retries
// which enqueues provided message to the specified topic.
// New unique message key `gohalt_enqueue_{{uuid}}` is created for each new message.
// Only successful connections are cached.
func NewEnqueuerKafka(net string, url string, topic string, retries uint64) Enqueuer {
enq := &enqkafka{}
memconnect, reset := cached(0, func(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func cached(cache time.Duration, run Runnable) (cached Runnable, reset Runnable)
var lock uint64
cached = func(ctx context.Context) error {
ts := atomicGet(&lock)
now := uint64(time.Now().UTC().Unix())
now := uint64(time.Now().UTC().UnixNano())
// on first call run no matters what
if ts == 0 {
if err := run(ctx); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ type mtcprometheus struct {

// NewMetricPrometheus creates prometheus metric querier instance
// with cache interval defined by the provided duration
// which executes provided prometheus boolean metric query.
// which executes provided prometheus boolean metric query and cache it.
// Only successful metric results are cached.
func NewMetricPrometheus(url string, query string, cache time.Duration) Metric {
mtc := &mtcprometheus{}
var api prometheus.API
Expand Down
1 change: 1 addition & 0 deletions monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type mntsys struct {
// NewMonitorSystem creates system monitor instance
// with cache interval defined by the provided duration
// and time to process CPU utilization.
// Only successful stats results are cached.
func NewMonitorSystem(cache time.Duration, tp time.Duration) Monitor {
mnt := &mntsys{}
memsync, _ := cached(cache, func(ctx context.Context) error {
Expand Down
39 changes: 35 additions & 4 deletions throttlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ func (thr ttimed) Acquire(ctx context.Context) error {
}

func (thr ttimed) Release(ctx context.Context) error {
return thr.tafter.Release(ctx)
_ = thr.tafter.Release(ctx)
return nil
}

type tlatency struct {
Expand Down Expand Up @@ -597,7 +598,8 @@ func (thr *tadaptive) Acquire(ctx context.Context) error {
}

func (thr tadaptive) Release(ctx context.Context) error {
return thr.ttimed.Release(ctx)
_ = thr.ttimed.Release(ctx)
return nil
}

// Pattern defines a pair of regexp and related throttler.
Expand Down Expand Up @@ -628,7 +630,8 @@ func (thr tpattern) Acquire(ctx context.Context) error {
func (thr tpattern) Release(ctx context.Context) error {
for _, pattern := range thr {
if key := ctxKey(ctx); pattern.Pattern.MatchString(key) {
return pattern.Throttler.Release(ctx)
_ = pattern.Throttler.Release(ctx)
return nil
}
}
return nil
Expand Down Expand Up @@ -659,7 +662,7 @@ func (thr *tring) Release(ctx context.Context) error {
if length := len(thr.thrs); length > 0 {
release := atomicIncr(&thr.release) - 1
index := int(release) % length
return thr.thrs[index].Release(ctx)
_ = thr.thrs[index].Release(ctx)
}
return nil
}
Expand Down Expand Up @@ -793,3 +796,31 @@ func (thr tretry) Release(ctx context.Context) error {
_ = thr.thr.Release(ctx)
return nil
}

type tcache struct {
thr Throttler
acquire Runnable
reset Runnable
}

// NewThrottlerCache creates new throttler instance that
// caches provided throttler calls for the provided cache duration,
// throttler release resulting resets cache.
// Only non throttling calls are cached for the provided cache duration.
func NewThrottlerCache(thr Throttler, cache time.Duration) Throttler {
tcache := tcache{thr: thr}
tcache.acquire, tcache.reset = cached(cache, func(ctx context.Context) error {
return thr.Acquire(ctx)
})
return tcache
}

func (thr tcache) Acquire(ctx context.Context) error {
return thr.acquire(ctx)
}

func (thr tcache) Release(ctx context.Context) error {
_ = thr.thr.Release(ctx)
_ = thr.reset(ctx)
return nil
}
61 changes: 53 additions & 8 deletions throttlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
Expand All @@ -25,19 +25,21 @@ const (
ms30_0 time.Duration = 30 * time.Millisecond
)

var trun Runner = NewRunnerAsync(context.Background(), NewThrottlerBuffered(0))
var trun Runner = NewRunnerSync(context.Background(), NewThrottlerBuffered(1))

type tcase struct {
tms uint64 // number of sub runs inside one case
thr Throttler // throttler itself
acts []Runnable // actions that need to be throttled
ins []Runnable // actions that need to be run inside throttle
pres []Runnable // actions that neeed to be run before throttle
tss []time.Duration // timestamps that needs to be applied to contexts set
ctxs []context.Context // contexts set for throttling
errs []error // expected throttler errors
durs []time.Duration // expected throttler durations
idx uint64 // carries seq number of sub run execution
over bool // if throttler needs to be over released
pass bool // if throttler doesn't need to be released
}

func (t *tcase) run(index int) (dur time.Duration, err error) {
Expand Down Expand Up @@ -71,6 +73,12 @@ func (t *tcase) run(index int) (dur time.Duration, err error) {
ctx = WithTimestamp(ctx, time.Now().Add(t.tss[index]))
}
err = t.thr.Acquire(ctx)
// run additional in action only if present
if index < len(t.ins) {
if in := t.ins[index]; in != nil {
_ = in(ctx)
}
}
}()
dur = time.Since(ts)
// run additional action only if present
Expand All @@ -83,6 +91,9 @@ func (t *tcase) run(index int) (dur time.Duration, err error) {
if t.over && uint64(index+1) == t.tms { // imitate over releasing on last call
limit = index + 1
}
if t.pass {
limit = 0
}
for i := 0; i < limit; i++ {
if err := t.thr.Release(ctx); err != nil {
return dur, err
Expand Down Expand Up @@ -783,7 +794,7 @@ func TestThrottlers(t *testing.T) {
tms: 3,
thr: NewThrottlerSuppress(NewThrottlerEcho(nil)),
},
"Throttler retry should throttle on recuring internal error": {
"Throttler retry should throttle on recurring internal error": {
tms: 3,
thr: NewThrottlerRetry(NewThrottlerEcho(errors.New("test")), 2),
errs: []error{
Expand All @@ -792,13 +803,47 @@ func TestThrottlers(t *testing.T) {
errors.New("test"),
},
},
"Throttler retry should not throttle on retried recuring internal error": {
"Throttler retry should not throttle on retried recurring internal error": {
tms: 3,
thr: NewThrottlerRetry(NewThrottlerAfter(3), 2),
thr: NewThrottlerRetry(NewThrottlerBefore(3), 2),
errs: []error{
errors.New("throttler has not reached threshold yet"),
nil,
nil,
},
},
"Throttler cache should not throttle on cached throttler": {
tms: 3,
thr: NewThrottlerCache(NewThrottlerAfter(1), ms30_0),
errs: []error{
errors.New("test"),
nil,
nil,
nil,
},
pass: true,
},
"Throttler cache should throttle on close to zero cached throttler": {
tms: 3,
thr: NewThrottlerCache(NewThrottlerAfter(2), ms1_0),
ins: []Runnable{
delayed(ms2_0, nope),
delayed(ms2_0, nope),
delayed(ms2_0, nope),
},
errs: []error{
nil,
nil,
errors.New("throttler has exceed threshold"),
},
pass: true,
},
"Throttler cache should throttle on cached throttler": {
tms: 3,
thr: NewThrottlerCache(NewThrottlerAfter(1), ms30_0),
errs: []error{
nil,
errors.New("throttler has exceed threshold"),
errors.New("throttler has exceed threshold"),
},
},
}
Expand All @@ -813,8 +858,8 @@ func TestThrottlers(t *testing.T) {
rdur, rerr := tcase.result(index)
dur, err := tcase.run(index)
trun.Run(func(context.Context) error {
assert.Equal(t, rerr, err)
assert.LessOrEqual(t, int64(rdur/2), int64(dur))
require.Equal(t, rerr, err)
require.LessOrEqual(t, int64(rdur/2), int64(dur))
return nil
})
}(i, &ptrtcase)
Expand Down

0 comments on commit 772fc1b

Please sign in to comment.