Skip to content

Commit

Permalink
bounded percentiles throttler implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Sep 28, 2020
1 parent 1b50937 commit b538f3e
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 67 deletions.
4 changes: 2 additions & 2 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func maxsearch(ctx context.Context, s searcher, topics []string, max uint64) (ur
| buffered | `NewThrottlerBuffered(threshold uint64)` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until until the running quota is available again. |
| priority | `NewThrottlerPriority(threshold uint64, levels uint8)` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until until the running quota is available again.<br> Running quota is not equally distributed between *n* levels of priority defined by the specified levels.<br> Use `WithPriority(ctx context.Context, priority uint8) context.Context` to override context call priority, *1* by default. |
| timed | `NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration)` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates. |
| latency | `NewThrottlerLatency(threshold time.Duration, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.<br> If retention is set then throttler state will be reseted after retention duration. |
| percentile | `NewThrottlerPercentile(threshold time.Duration, percentile float64, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.<br> If retention is set then throttler state will be reseted after retention duration. |
| latency | `NewThrottlerLatency(threshold time.Duration, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.<br> If retention is set then throttler state will be reseted after retention duration.<br> Use `WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*. |
| percentile | `NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.<br> Percentile values are kept in bounded buffer with capacity *c* defined by the specified capacity. <br> If retention is set then throttler state will be reseted after retention duration.<br> Use `WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*. |
| monitor | `NewThrottlerMonitor(mnt Monitor, threshold Stats)` | Throttles call if any of the stats returned by the provided monitor exceeds any of the stats defined by the specified threshold or if any internal error occurred. |
| metric | `NewThrottlerMetric(mtc Metric)` | Throttles call if metric defined by the specified metric is riched or if any internal error occurred. |
| enqueuer | `NewThrottlerEnqueue(enq Enqueuer)` | Always enqueues message to the specified queue throttles only if any internal error occurred.<br> Use `WithData(ctx context.Context, data interface{}) context.Context` to specify context data for enqueued message and `WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context` to specify context data marshaler. |
Expand Down
55 changes: 0 additions & 55 deletions heap.go

This file was deleted.

46 changes: 46 additions & 0 deletions percentiles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package gohalt

import (
"math"
"sort"
"sync"
)

type percentiles struct {
buf []uint64
cap uint8
lock sync.Mutex
}

func (p *percentiles) Len() int {
p.lock.Lock()
defer p.lock.Unlock()
return len(p.buf)
}

func (p *percentiles) Push(dim uint64) {
p.lock.Lock()
defer p.lock.Unlock()
if len(p.buf) >= int(p.cap) {
p.buf = p.buf[1:]
}
p.buf = append(p.buf, dim)
}

func (p *percentiles) At(pval float64) uint64 {
p.lock.Lock()
defer p.lock.Unlock()
buf := make([]uint64, len(p.buf))
_ = copy(buf, p.buf)
sort.Slice(buf, func(i, j int) bool {
return buf[i] < buf[j]
})
at := int(math.Round(float64(len(buf)-1) * pval))
return buf[at]
}

func (p *percentiles) Prune() {
p.lock.Lock()
defer p.lock.Unlock()
p.buf = make([]uint64, 0, p.cap)
}
16 changes: 8 additions & 8 deletions throttlers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gohalt

import (
"container/heap"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -392,18 +391,20 @@ func (thr *tlatency) Release(ctx context.Context) error {

type tpercentile struct {
reset Runnable
latencies *blatheap
latencies *percentiles
threshold time.Duration
percentile float64
retention time.Duration
}

func NewThrottlerPercentile(threshold time.Duration, percentile float64, retention time.Duration) tpercentile {
func NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration) tpercentile {
percentile = math.Abs(percentile)
if percentile > 1.0 {
percentile = 1.0
}
thr := tpercentile{latencies: &blatheap{}, threshold: threshold, percentile: percentile, retention: retention}
thr := tpercentile{threshold: threshold, percentile: percentile, retention: retention}
thr.latencies = &percentiles{cap: capacity}
thr.latencies.Prune()
thr.reset = locked(
delayed(thr.retention, func(context.Context) error {
thr.latencies.Prune()
Expand All @@ -414,9 +415,8 @@ func NewThrottlerPercentile(threshold time.Duration, percentile float64, retenti
}

func (thr tpercentile) Acquire(ctx context.Context) error {
if length := thr.latencies.Len(); length > 0 {
at := int(math.Round(float64(length-1) * thr.percentile))
if latency := thr.latencies.At(at); latency >= uint64(thr.threshold) {
if thr.latencies.Len() > 0 {
if latency := thr.latencies.At(thr.percentile); latency >= uint64(thr.threshold) {
gorun(ctx, thr.reset)
return errors.New("throttler has exceed latency threshold")
}
Expand All @@ -428,7 +428,7 @@ func (thr tpercentile) Release(ctx context.Context) error {
nowTs := time.Now().UTC().UnixNano()
ctxTs := ctxTimestamp(ctx).UnixNano()
latency := uint64(nowTs - ctxTs)
heap.Push(thr.latencies, latency)
thr.latencies.Push(latency)
return nil
}

Expand Down
21 changes: 19 additions & 2 deletions throttlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func TestThrottlers(t *testing.T) {
},
"Throttler percentile should throttle on latency above threshold": {
tms: 5,
thr: NewThrottlerPercentile(ms3_0, 0.5, ms7_0),
thr: NewThrottlerPercentile(ms3_0, 10, 0.5, ms7_0),
tss: []time.Duration{
ms0_0,
-ms5_0,
Expand All @@ -451,7 +451,7 @@ func TestThrottlers(t *testing.T) {
},
"Throttler percentile should throttle on latency above threshold after retention": {
tms: 5,
thr: NewThrottlerPercentile(ms3_0, 1.5, ms5_0),
thr: NewThrottlerPercentile(ms3_0, 10, 1.5, ms5_0),
tss: []time.Duration{
ms0_0,
-ms5_0,
Expand All @@ -473,6 +473,23 @@ func TestThrottlers(t *testing.T) {
nil,
},
},
"Throttler percentile should throttle on latency above threshold with capacity": {
tms: 5,
thr: NewThrottlerPercentile(ms3_0, 1, 0.5, ms7_0),
tss: []time.Duration{
ms0_0,
-ms5_0,
-ms1_0,
-ms5_0,
},
errs: []error{
nil,
nil,
errors.New("throttler has exceed latency threshold"),
nil,
errors.New("throttler has exceed latency threshold"),
},
},
"Throttler adaptive should throttle on throttling adoptee": {
tms: 3,
thr: NewThrottlerAdaptive(
Expand Down

0 comments on commit b538f3e

Please sign in to comment.