-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathlimiter.go
183 lines (152 loc) · 6.6 KB
/
limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package store
import (
"sync"
"github.com/alecthomas/units"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
type ChunksLimiter interface {
// Reserve num chunks out of the total number of chunks enforced by the limiter.
// Returns an error if the limit has been exceeded. This function must be
// goroutine safe.
Reserve(num uint64) error
}
type SeriesLimiter interface {
// Reserve num series out of the total number of series enforced by the limiter.
// Returns an error if the limit has been exceeded. This function must be
// goroutine safe.
Reserve(num uint64) error
}
type BytesLimiter interface {
// Reserve bytes out of the total amount of bytes enforced by the limiter.
// Returns an error if the limit has been exceeded. This function must be
// goroutine safe.
ReserveWithType(num uint64, dataType StoreDataType) error
}
// ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for
// projects depending on Thanos (eg. Cortex) which have dynamic limits.
type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter
// SeriesLimiterFactory is used to create a new SeriesLimiter.
type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter
// BytesLimiterFactory is used to create a new BytesLimiter.
type BytesLimiterFactory func(failedCounter prometheus.Counter) BytesLimiter
// Limiter is a simple mechanism for checking if something has passed a certain threshold.
type Limiter struct {
limit uint64
reserved atomic.Uint64
// Counter metric which we will increase if limit is exceeded.
failedCounter prometheus.Counter
failedOnce sync.Once
}
// NewLimiter returns a new limiter with a specified limit. 0 disables the limit.
func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter {
return &Limiter{limit: limit, failedCounter: ctr}
}
// Reserve implements ChunksLimiter.
func (l *Limiter) Reserve(num uint64) error {
return l.ReserveWithType(num, 0)
}
func (l *Limiter) ReserveWithType(num uint64, _ StoreDataType) error {
if l == nil {
return nil
}
if l.limit == 0 {
return nil
}
if reserved := l.reserved.Add(num); reserved > l.limit {
// We need to protect from the counter being incremented twice due to concurrency
// while calling Reserve().
l.failedOnce.Do(l.failedCounter.Inc)
return errors.Errorf("limit %v violated (got %v)", l.limit, reserved)
}
return nil
}
// NewChunksLimiterFactory makes a new ChunksLimiterFactory with a static limit.
func NewChunksLimiterFactory(limit uint64) ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) ChunksLimiter {
return NewLimiter(limit, failedCounter)
}
}
// NewSeriesLimiterFactory makes a new SeriesLimiterFactory with a static limit.
func NewSeriesLimiterFactory(limit uint64) SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) SeriesLimiter {
return NewLimiter(limit, failedCounter)
}
}
// NewBytesLimiterFactory makes a new BytesLimiterFactory with a static limit.
func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory {
return func(failedCounter prometheus.Counter) BytesLimiter {
return NewLimiter(uint64(limit), failedCounter)
}
}
// SeriesSelectLimits are limits applied against individual Series calls.
type SeriesSelectLimits struct {
SeriesPerRequest uint64
SamplesPerRequest uint64
}
func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) {
cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest)
cmd.Flag("store.limits.request-samples", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest)
}
var _ storepb.StoreServer = &limitedStoreServer{}
// limitedStoreServer is a storepb.StoreServer that can apply series and sample limits against individual Series requests.
type limitedStoreServer struct {
storepb.StoreServer
newSeriesLimiter SeriesLimiterFactory
newSamplesLimiter ChunksLimiterFactory
failedRequestsCounter *prometheus.CounterVec
}
// NewLimitedStoreServer creates a new limitedStoreServer.
func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer {
return &limitedStoreServer{
StoreServer: store,
newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest),
newSamplesLimiter: NewChunksLimiterFactory(selectLimits.SamplesPerRequest),
failedRequestsCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_store_selects_dropped_total",
Help: "Number of select queries that were dropped due to configured limits.",
}, []string{"reason"}),
}
}
func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series"))
chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks"))
limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter)
if err := s.StoreServer.Series(req, limitedSrv); err != nil {
return err
}
return nil
}
var _ storepb.Store_SeriesServer = &limitedServer{}
// limitedServer is a storepb.Store_SeriesServer that tracks statistics about sent series.
type limitedServer struct {
storepb.Store_SeriesServer
seriesLimiter SeriesLimiter
samplesLimiter ChunksLimiter
}
func newLimitedServer(upstream storepb.Store_SeriesServer, seriesLimiter SeriesLimiter, chunksLimiter ChunksLimiter) *limitedServer {
return &limitedServer{
Store_SeriesServer: upstream,
seriesLimiter: seriesLimiter,
samplesLimiter: chunksLimiter,
}
}
func (i *limitedServer) Send(response *storepb.SeriesResponse) error {
series := response.GetSeries()
if series == nil {
return i.Store_SeriesServer.Send(response)
}
if err := i.seriesLimiter.Reserve(1); err != nil {
return errors.Wrapf(err, "failed to send series")
}
if err := i.samplesLimiter.Reserve(uint64(len(series.Chunks) * MaxSamplesPerChunk)); err != nil {
return errors.Wrapf(err, "failed to send samples")
}
return i.Store_SeriesServer.Send(response)
}