diff --git a/README.MD b/README.MD index cfa86b2..7b0c8cd 100644 --- a/README.MD +++ b/README.MD @@ -154,8 +154,8 @@ func maxsearch(ctx context.Context, s searcher, topics []string, max uint64) (ur | buffered | `NewThrottlerBuffered(threshold uint64)` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until until the running quota is available again. | | priority | `NewThrottlerPriority(threshold uint64, levels uint8)` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until until the running quota is available again.
Running quota is not equally distributed between *n* levels of priority defined by the specified levels.
Use `WithPriority(ctx context.Context, priority uint8) context.Context` to override context call priority, *1* by default. | | timed | `NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration)` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.
Periodically each specified interval the running quota number is reseted.
If quantum is set then quantum will be used instead of interval to provide the running quota delta updates. | -| latency | `NewThrottlerLatency(threshold time.Duration, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.
If retention is set then throttler state will be reseted after retention duration. | -| percentile | `NewThrottlerPercentile(threshold time.Duration, percentile float64, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.
If retention is set then throttler state will be reseted after retention duration. | +| latency | `NewThrottlerLatency(threshold time.Duration, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.
If retention is set then throttler state will be reseted after retention duration.
Use `WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*. | +| percentile | `NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration)` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.
Percentile values are kept in bounded buffer with capacity *c* defined by the specified capacity.
If retention is set then throttler state will be reseted after retention duration.
Use `WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*. | | monitor | `NewThrottlerMonitor(mnt Monitor, threshold Stats)` | Throttles call if any of the stats returned by the provided monitor exceeds any of the stats defined by the specified threshold or if any internal error occurred. | | metric | `NewThrottlerMetric(mtc Metric)` | Throttles call if metric defined by the specified metric is riched or if any internal error occurred. | | enqueuer | `NewThrottlerEnqueue(enq Enqueuer)` | Always enqueues message to the specified queue throttles only if any internal error occurred.
Use `WithData(ctx context.Context, data interface{}) context.Context` to specify context data for enqueued message and `WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context` to specify context data marshaler. | diff --git a/heap.go b/heap.go deleted file mode 100644 index 33fe75f..0000000 --- a/heap.go +++ /dev/null @@ -1,55 +0,0 @@ -package gohalt - -import "sync" - -type blatheap struct { - buffer []uint64 - lock sync.Mutex -} - -func (lh *blatheap) Len() int { - lh.lock.Lock() - defer lh.lock.Unlock() - return len(lh.buffer) -} - -func (lh *blatheap) Less(i int, j int) bool { - lh.lock.Lock() - defer lh.lock.Unlock() - return lh.buffer[i] < lh.buffer[j] -} - -func (lh *blatheap) Swap(i int, j int) { - lh.lock.Lock() - defer lh.lock.Unlock() - lh.buffer[i], lh.buffer[j] = lh.buffer[j], lh.buffer[i] -} - -func (lh *blatheap) Push(x interface{}) { - lh.lock.Lock() - defer lh.lock.Unlock() - if lat, ok := x.(uint64); ok { - lh.buffer = append(lh.buffer, lat) - } -} - -func (lh *blatheap) Pop() interface{} { - lh.lock.Lock() - defer lh.lock.Unlock() - blen := len(lh.buffer) - val := lh.buffer[blen-1] - lh.buffer = lh.buffer[:blen-1] - return val -} - -func (lh *blatheap) At(pos int) uint64 { - lh.lock.Lock() - defer lh.lock.Unlock() - return lh.buffer[pos] -} - -func (lh *blatheap) Prune() { - lh.lock.Lock() - defer lh.lock.Unlock() - lh.buffer = nil -} diff --git a/percentiles.go b/percentiles.go new file mode 100644 index 0000000..95b5f10 --- /dev/null +++ b/percentiles.go @@ -0,0 +1,46 @@ +package gohalt + +import ( + "math" + "sort" + "sync" +) + +type percentiles struct { + buf []uint64 + cap uint8 + lock sync.Mutex +} + +func (p *percentiles) Len() int { + p.lock.Lock() + defer p.lock.Unlock() + return len(p.buf) +} + +func (p *percentiles) Push(dim uint64) { + p.lock.Lock() + defer p.lock.Unlock() + if len(p.buf) >= int(p.cap) { + p.buf = p.buf[1:] + } + p.buf = append(p.buf, dim) +} + +func (p *percentiles) At(pval float64) uint64 { + p.lock.Lock() + defer p.lock.Unlock() + buf := make([]uint64, len(p.buf)) + _ = copy(buf, p.buf) + sort.Slice(buf, func(i, j int) bool { + return buf[i] < buf[j] + }) + at := int(math.Round(float64(len(buf)-1) * pval)) + return buf[at] +} + +func (p *percentiles) Prune() { + p.lock.Lock() + defer p.lock.Unlock() + p.buf = make([]uint64, 0, p.cap) +} diff --git a/throttlers.go b/throttlers.go index f29e50e..09d9d17 100644 --- a/throttlers.go +++ b/throttlers.go @@ -1,7 +1,6 @@ package gohalt import ( - "container/heap" "context" "errors" "fmt" @@ -392,18 +391,20 @@ func (thr *tlatency) Release(ctx context.Context) error { type tpercentile struct { reset Runnable - latencies *blatheap + latencies *percentiles threshold time.Duration percentile float64 retention time.Duration } -func NewThrottlerPercentile(threshold time.Duration, percentile float64, retention time.Duration) tpercentile { +func NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration) tpercentile { percentile = math.Abs(percentile) if percentile > 1.0 { percentile = 1.0 } - thr := tpercentile{latencies: &blatheap{}, threshold: threshold, percentile: percentile, retention: retention} + thr := tpercentile{threshold: threshold, percentile: percentile, retention: retention} + thr.latencies = &percentiles{cap: capacity} + thr.latencies.Prune() thr.reset = locked( delayed(thr.retention, func(context.Context) error { thr.latencies.Prune() @@ -414,9 +415,8 @@ func NewThrottlerPercentile(threshold time.Duration, percentile float64, retenti } func (thr tpercentile) Acquire(ctx context.Context) error { - if length := thr.latencies.Len(); length > 0 { - at := int(math.Round(float64(length-1) * thr.percentile)) - if latency := thr.latencies.At(at); latency >= uint64(thr.threshold) { + if thr.latencies.Len() > 0 { + if latency := thr.latencies.At(thr.percentile); latency >= uint64(thr.threshold) { gorun(ctx, thr.reset) return errors.New("throttler has exceed latency threshold") } @@ -428,7 +428,7 @@ func (thr tpercentile) Release(ctx context.Context) error { nowTs := time.Now().UTC().UnixNano() ctxTs := ctxTimestamp(ctx).UnixNano() latency := uint64(nowTs - ctxTs) - heap.Push(thr.latencies, latency) + thr.latencies.Push(latency) return nil } diff --git a/throttlers_test.go b/throttlers_test.go index 1b55fa1..9e2c5b8 100644 --- a/throttlers_test.go +++ b/throttlers_test.go @@ -434,7 +434,7 @@ func TestThrottlers(t *testing.T) { }, "Throttler percentile should throttle on latency above threshold": { tms: 5, - thr: NewThrottlerPercentile(ms3_0, 0.5, ms7_0), + thr: NewThrottlerPercentile(ms3_0, 10, 0.5, ms7_0), tss: []time.Duration{ ms0_0, -ms5_0, @@ -451,7 +451,7 @@ func TestThrottlers(t *testing.T) { }, "Throttler percentile should throttle on latency above threshold after retention": { tms: 5, - thr: NewThrottlerPercentile(ms3_0, 1.5, ms5_0), + thr: NewThrottlerPercentile(ms3_0, 10, 1.5, ms5_0), tss: []time.Duration{ ms0_0, -ms5_0, @@ -473,6 +473,23 @@ func TestThrottlers(t *testing.T) { nil, }, }, + "Throttler percentile should throttle on latency above threshold with capacity": { + tms: 5, + thr: NewThrottlerPercentile(ms3_0, 1, 0.5, ms7_0), + tss: []time.Duration{ + ms0_0, + -ms5_0, + -ms1_0, + -ms5_0, + }, + errs: []error{ + nil, + nil, + errors.New("throttler has exceed latency threshold"), + nil, + errors.New("throttler has exceed latency threshold"), + }, + }, "Throttler adaptive should throttle on throttling adoptee": { tms: 3, thr: NewThrottlerAdaptive(