-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathwindowedthroughput.go
229 lines (198 loc) · 7.42 KB
/
windowedthroughput.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package dynsampler
import (
"math"
"sync"
"time"
)
// Windowed Throughput sampling is an enhanced version of total throughput sampling.
// Just like the original throughput sampler, it attempts to meet the goal of fixed number of events
// per second sent to Honeycomb.
//
// The original throughput sampler updates the sampling rate every "ClearFrequency" seconds. While
// this parameter is configurable, it suffers from the following tradeoff:
// - Decreasing it makes you more responsive to load spikes, but with the cost of making the
// sampling decision on less data.
// - Increasing it makes you less responsive to load spikes, but your sample rates will be more
// stable because they are made with more data.
//
// The windowed throughput sampler resolves this by introducing two different, tunable parameters:
// - UpdateFrequency: how often the sampling rate is recomputed
// - LookbackFrequency: how far back we look back in time to recompute our sampling rate.
//
// A standard configuration would be to set UpdateFrequency to 1s and LookbackFrequency to 30s. In
// this configuration, every second, we lookback at the last 30s of data in order to compute the new
// sampling rate. The actual sampling rate computation is nearly identical to the original
// throughput sampler, but this variant has better support for floating point numbers.
//
// Because our lookback window is _rolling_ instead of static, we need a special datastructure to
// quickly and efficiently store our data. The code and additional information for this
// datastructure can be found in blocklist.go.
type WindowedThroughput struct {
// UpdateFrequency is how often the sampling rate is recomputed, default is 1s.
UpdateFrequencyDuration time.Duration
// LookbackFrequency is how far back in time we lookback to dynamically adjust our sampling
// rate. Default is 30 * UpdateFrequencyDuration. This will be 30s assuming the default
// configuration of UpdateFrequencyDuration. We enforce this to be an _integer multiple_ of
// UpdateFrequencyDuration.
LookbackFrequencyDuration time.Duration
// Target throughput per second.
GoalThroughputPerSec float64
// MaxKeys, if greater than 0, limits the number of distinct keys used to build
// the sample rate map within the interval defined by `LookbackFrequencyDuration`. Once
// MaxKeys is reached, new keys will not be included in the sample rate map, but
// existing keys will continue to be be counted.
// If MaxKeys is set to 0 (default), there is no upper bound on the number of distinct keys.
MaxKeys int
savedSampleRates map[string]int
done chan struct{}
countList BlockList
indexGenerator IndexGenerator
lock sync.Mutex
// metrics
requestCount int64
eventCount int64
numKeys int
}
// Ensure we implement the sampler interface
var _ Sampler = (*WindowedThroughput)(nil)
// An index generator turns timestamps into indexes. This is essentially a bucketing mechanism.
type IndexGenerator interface {
// Get the index corresponding to the current time.
GetCurrentIndex() int64
// Return the index differential for a particular duration -- i.e. 5 seconds = how many ticks of
// the index.
DurationToIndexes(duration time.Duration) int64
}
// The standard implementation of the index generator.
type UnixSecondsIndexGenerator struct {
DurationPerIndex time.Duration
}
func (g *UnixSecondsIndexGenerator) GetCurrentIndex() int64 {
nsec := time.Now().UnixNano()
return nsec / g.DurationPerIndex.Nanoseconds()
}
func (g *UnixSecondsIndexGenerator) DurationToIndexes(duration time.Duration) int64 {
return duration.Nanoseconds() / g.DurationPerIndex.Nanoseconds()
}
func (t *WindowedThroughput) Start() error {
// apply defaults
if t.UpdateFrequencyDuration == 0 {
t.UpdateFrequencyDuration = time.Second
}
if t.LookbackFrequencyDuration == 0 {
t.LookbackFrequencyDuration = 30 * t.UpdateFrequencyDuration
}
// Floor LookbackFrequencyDuration to be an integer multiple of UpdateFrequencyDuration.
t.LookbackFrequencyDuration = t.UpdateFrequencyDuration *
(t.LookbackFrequencyDuration / t.UpdateFrequencyDuration)
if t.GoalThroughputPerSec == 0 {
t.GoalThroughputPerSec = 100
}
// Initialize countList.
if t.MaxKeys > 0 {
t.countList = NewBoundedBlockList(t.MaxKeys)
} else {
t.countList = NewUnboundedBlockList()
}
// Initialize internal variables.
t.savedSampleRates = make(map[string]int)
t.done = make(chan struct{})
// Initialize the index generator. Each UpdateFrequencyDuration represents a single tick of the
// index.
t.indexGenerator = &UnixSecondsIndexGenerator{
DurationPerIndex: t.UpdateFrequencyDuration,
}
// Spin up calculator.
go func() {
ticker := time.NewTicker(t.UpdateFrequencyDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.updateMaps()
case <-t.done:
return
}
}
}()
return nil
}
func (t *WindowedThroughput) Stop() error {
close(t.done)
return nil
}
// updateMaps recomputes the sample rate based on the countList.
func (t *WindowedThroughput) updateMaps() {
currentIndex := t.indexGenerator.GetCurrentIndex()
lookbackIndexes := t.indexGenerator.DurationToIndexes(t.LookbackFrequencyDuration)
aggregateCounts := t.countList.AggregateCounts(currentIndex, lookbackIndexes)
// Apply the same aggregation algorithm as total throughput
// Short circuit if no traffic
numKeys := len(aggregateCounts)
if numKeys == 0 {
// no traffic during the last period.
t.lock.Lock()
defer t.lock.Unlock()
t.numKeys = 0
t.savedSampleRates = make(map[string]int)
return
}
// figure out our target throughput per key over the lookback window.
totalGoalThroughput := t.GoalThroughputPerSec * t.LookbackFrequencyDuration.Seconds()
// split the total throughput equally across the number of keys.
throughputPerKey := float64(totalGoalThroughput) / float64(numKeys)
// for each key, calculate sample rate by dividing counted events by the
// desired number of events
newSavedSampleRates := make(map[string]int)
for k, v := range aggregateCounts {
rate := int(math.Max(1, (float64(v) / float64(throughputPerKey))))
newSavedSampleRates[k] = rate
}
// save newly calculated sample rates
t.lock.Lock()
defer t.lock.Unlock()
t.savedSampleRates = newSavedSampleRates
t.numKeys = numKeys
}
// GetSampleRate takes a key and returns the appropriate sample rate for that
// key.
func (t *WindowedThroughput) GetSampleRate(key string) int {
return t.GetSampleRateMulti(key, 1)
}
// GetSampleRateMulti takes a key representing count spans and returns the
// appropriate sample rate for that key.
func (t *WindowedThroughput) GetSampleRateMulti(key string, count int) int {
t.requestCount++
t.eventCount += int64(count)
// Insert the new key into the map.
current := t.indexGenerator.GetCurrentIndex()
err := t.countList.IncrementKey(key, current, count)
// We've reached MaxKeys, return 0.
if err != nil {
return 0
}
t.lock.Lock()
defer t.lock.Unlock()
if rate, found := t.savedSampleRates[key]; found {
return rate
}
return 0
}
// SaveState is not implemented
func (t *WindowedThroughput) SaveState() ([]byte, error) {
return nil, nil
}
// LoadState is not implemented
func (t *WindowedThroughput) LoadState(state []byte) error {
return nil
}
func (t *WindowedThroughput) GetMetrics(prefix string) map[string]int64 {
t.lock.Lock()
defer t.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": t.requestCount,
prefix + "event_count": t.eventCount,
prefix + "keyspace_size": int64(t.numKeys),
}
return mets
}