-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththrottle.go
111 lines (94 loc) · 3.41 KB
/
throttle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package grpc_athrottle
import (
"context"
"time"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc"
)
// ThrottleOptions contains options for the adaptive throttling logic.
type ThrottleOptions struct {
WindowDuration time.Duration // sliding window used to calculate throttle ratio
MinRequestCount int // number of requests that must be present within WindowDuration before throttling
MaxRatio float32 // ratio of requests/successes that must be met before throttling begins
Callback func(CounterEvent) // Callback function for counter events, useful for logging throttle events and metrics
}
// DefaultOptions provides recommended defaults for ThrottleOptions
var DefaultOptions = ThrottleOptions{
WindowDuration: 3 * time.Minute,
MinRequestCount: 25,
MaxRatio: 2.,
Callback: func(CounterEvent) {},
}
// DO NOT increment accept count when error response indicates server-side rejection or overloading
// DO increment accept count for normal application errors
var overloadCodes = []codes.Code{
codes.Aborted,
codes.FailedPrecondition,
codes.Unavailable,
codes.DeadlineExceeded,
codes.ResourceExhausted,
}
func shouldAccept(err error) bool {
if err == nil {
return true
}
s, ok := status.FromError(err)
if !ok {
// unknown status code
return false
}
c := s.Code()
for _, oc := range overloadCodes {
if c == oc {
return false
}
}
return true
}
func newCounter(throttleOpts *ThrottleOptions) *Counter {
if throttleOpts == nil {
throttleOpts = &DefaultOptions
}
binDuration := 10 * time.Second
binCount := int(throttleOpts.WindowDuration / binDuration)
return NewCounter(
binCount,
binDuration,
throttleOpts.MinRequestCount,
throttleOpts.MaxRatio,
time.Now().UnixNano(),
throttleOpts.Callback)
}
// NewUnaryClientInterceptor returns Interceptor that throttles client requests
// based on adaptive throttling model as described in the Google SRE book: https://landing.google.com/sre/book/chapters/handling-overload.html
func NewClientUnaryInterceptor(throttleOpts *ThrottleOptions, opts ...grpc.CallOption) grpc.UnaryClientInterceptor {
counter := newCounter(throttleOpts)
return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if counter.RejectNext() {
return grpc.Errorf(codes.ResourceExhausted, "client request throttled")
}
counter.MarkRequest()
err := invoker(parentCtx, method, req, reply, cc, opts...)
if shouldAccept(err) {
counter.MarkAccept()
}
return err
}
}
// NewStreamClientInterceptor returns Interceptor that throttles client requests
// based on adaptive throttling model as described in the Google SRE book: https://landing.google.com/sre/book/chapters/handling-overload.html
func NewClientStreamInterceptor(throttleOpts *ThrottleOptions, opts ...grpc.CallOption) grpc.StreamClientInterceptor {
counter := newCounter(throttleOpts)
return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if counter.RejectNext() {
return nil, grpc.Errorf(codes.ResourceExhausted, "client request throttled")
}
counter.MarkRequest()
s, err := streamer(parentCtx, desc, cc, method, opts...)
if shouldAccept(err) {
counter.MarkAccept()
}
return s, err
}
}