-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathcache-collector.go
387 lines (333 loc) · 12.1 KB
/
cache-collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
package main
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"sync"
"time"
logcache "code.cloudfoundry.org/log-cache/pkg/client"
"code.cloudfoundry.org/log-cache/pkg/rpc/logcache_v1"
jwt "github.com/dgrijalva/jwt-go"
)
/*
SOME NOTES
{
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"addr": "10.1.1.31:8080",
"deployment": "cf-0e11708a646c5524f1e2",
"host": "",
"id": "062d6eab-c151-4b96-8668-3e5e623010ab",
"index": "062d6eab-c151-4b96-8668-3e5e623010ab",
"instance-id": "",
"ip": "10.1.1.31",
"job": "doppler",
"product": "Pivotal Application Service",
"source_id": "log-cache",
"system_domain": "system.domain"
},
"value": [
1557929172,
"218809"
]
}
]
}
}
CLI Range query
cf query 'ingress{source_id="doppler"}' --step 1 --start `date +'%s'` --end $(expr $(date +'%s') + 30) | jq
cf query 'rate(ingress{source_id="doppler"}[5m] offset 2m)' | jq
promql notes
https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors
*/
// CacheMetric used to parse result reponse
type CacheMetric struct {
Addr string `json:"addr"`
Deployment string `json:"deployment"`
Host string `json:"host"`
ID string `json:"id"`
Index string `json:"index"`
InstanceID string `json:"instance-id"`
IP string `json:"ip"`
Job string `json:"job"`
Product string `json:"product"`
SourceID string `json:"source_id"`
SystemDomain string `json:"system_domain"`
}
// CacheResultItem used to parse reulst reponse
type CacheResultItem struct {
Metric CacheMetric `json:"metric"`
Value []interface{} `json:"value"` // [ timestamp int, value string ]
}
// CacheData used to parse result reponse
type CacheData struct {
ResultType string `json:"resultType"`
Result []CacheResultItem `json:"result"`
}
// CacheResult response from log-cache promql query
type CacheResult struct {
Status string `json:"status"` // expected to be "sucess"
Data CacheData `json:"data"`
}
// HTTPClient client used to pass in access token for log cache requests
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
type tokenHTTPClient struct {
c HTTPClient
accessToken string
}
func (c *tokenHTTPClient) Do(req *http.Request) (*http.Response, error) {
if len(c.accessToken) > 0 {
req.Header.Set("Authorization", c.accessToken)
}
return c.c.Do(req)
}
// InstanceMetrics average system metrics for instance groups
type InstanceMetrics struct {
CPUUser float64
CPUSys float64
CPUWait float64
Memory float64
Count int64 // number of instances
Name string // name
}
// TrafficControllerMetrics TC metrics
type TrafficControllerMetrics struct {
System InstanceMetrics
SlowConsumers float64
AppStreams float64
Egress float64
Ingress float64
ContainerLatency float64
}
type RLPMetrics struct {
Egress float64
Ingress float64
Dropped float64
}
// MetronMetrics metron metrics
type MetronMetrics struct {
System InstanceMetrics
Ingress float64
Egress float64
Dropped float64
AVGEnvelope float64
Name string // name/index
}
// DopplerMetrics doppler metrics
type DopplerMetrics struct {
System InstanceMetrics
MessageRateCapacity float64
Subscriptions float64
Egress float64
Ingress float64
IngressDropped float64
Dropped float64
Name string // name/index
}
// SyslogAdapterMetrics syslog adapter metrics
type SyslogAdapterMetrics struct {
System InstanceMetrics
Ingress float64
Dropped float64
}
// SyslogSchedulerMetrics syslog scheduler metrics
type SyslogSchedulerMetrics struct {
System InstanceMetrics
}
// DrainMetrics Drain metrics
type DrainMetrics struct {
AgentBindings float64
AgentIngress float64
AgentEgress float64
AgentDropped float64
AgentInvalidDrains float64
AgentActiveDrains float64
AgentNonAppDrains float64
AgentBlacklistedDrains float64
}
/*
GAUGE drains:0.000000
GAUGE non_app_drains:0.000000
GAUGE blacklisted_drains:0.000000
GAUGE active_drains:0.000000
GAUGE invalid_drains:0.000000
*/
// Metrics root of all computed metrics
type Metrics struct {
System []InstanceMetrics
Doppler DopplerMetrics
TC TrafficControllerMetrics
RLP RLPMetrics
Metron MetronMetrics
Drain DrainMetrics
DopplerInstance DopplerMetrics
MetronInstance MetronMetrics
SyslogAdapter SyslogAdapterMetrics
SyslogScheduler SyslogSchedulerMetrics
}
// LCC used to manage log cache endoint and credentials
type LCC struct {
accessToken string
client *logcache.Client
Metric Metrics
mux sync.Mutex
Start time.Time
Stop time.Time
Offset string
Duration string
CollectionErrors []error
}
var (
queryAvgRateJob string
queryAvgRate string
querySumRateJob string
querySumRate string
querySumJob string
querySum string
queryMetricJob string
queryIngressMaxOverTime string
queryMin string
)
// NewLogCacheClient createa new LCC and returns it
func NewLogCacheClient(address string) (LCC, error) {
lc := LCC{Metric: Metrics{}, CollectionErrors: make([]error, 0)}
lc.fetchToken()
h := http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
tc := tokenHTTPClient{HTTPClient(&h), lc.accessToken}
lc.client = logcache.NewClient(address, logcache.WithHTTPClient(&tc))
return lc, nil
}
func (lc *LCC) fetchToken() {
var err error
lc.accessToken, err = cfCLI.AccessToken()
if err != nil {
logger.Fatalln(err)
}
}
func (lc *LCC) checkToken() {
t, err := jwt.Parse(lc.accessToken[7:len(lc.accessToken)], func(token *jwt.Token) (interface{}, error) { return []byte(""), nil })
if err != nil {
if err.Error() != jwt.ErrInvalidKeyType.Error() {
logger.Printf("Could not parse existing token \"%s\" so Fetching a new token\n", err)
lc.fetchToken()
}
}
claims := t.Claims.(jwt.MapClaims)
if !claims.VerifyExpiresAt(time.Unix(time.Unix(int64(claims["exp"].(float64)), 0).Unix()-60, 0).Unix(), true) {
lc.fetchToken() // fetch new before expire
}
}
// GetResult given metric and source id result is returned
func (lc *LCC) GetResult(metric, sourceid, job, q string) (*logcache_v1.PromQL_InstantQueryResult, error) {
lc.checkToken()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var result *logcache_v1.PromQL_InstantQueryResult
var err error
if job != "" {
result, err = lc.client.PromQL(ctx, fmt.Sprintf(q, metric, sourceid, job))
} else {
result, err = lc.client.PromQL(ctx, fmt.Sprintf(q, metric, sourceid))
}
return result, err
}
// GetSinlgeMetric given range, metric and source id result is returned
func (lc *LCC) GetSinlgeMetric(metric, sourceid, job, q string) float64 {
lc.checkToken()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var result *logcache_v1.PromQL_InstantQueryResult
var err error
var qformatted string
if job != "" {
qformatted = fmt.Sprintf(q, metric, sourceid, job)
} else {
qformatted = fmt.Sprintf(q, metric, sourceid)
}
result, err = lc.client.PromQL(ctx, qformatted)
if err != nil {
lc.CollectionErrors = append(lc.CollectionErrors, fmt.Errorf("%s: %s", qformatted, err))
}
return getSingleSampleResult(result.GetVector().GetSamples())
}
// Collect updates metrics from log-cache
func (lc *LCC) Collect() error {
lc.Lock()
defer lc.Unlock()
lc.updateQeries(*sampleOffset, *sampleDuration)
lc.getAvgSystemMetrics(&lc.Metric.TC.System, tcJob)
lc.getAvgSystemMetrics(&lc.Metric.Doppler.System, dopplerJob)
lc.Metric.TC.AppStreams = lc.GetSinlgeMetric(appStreamsGauge, trafficControllerSID, tcJob, querySumJob)
lc.Metric.TC.SlowConsumers = lc.GetSinlgeMetric(slowConsumerCounter, trafficControllerSID, tcJob, queryAvgRateJob)
lc.Metric.Drain.AgentIngress = lc.GetSinlgeMetric(ingressCounter, syslogAgentSID, "", querySumRate)
lc.Metric.Drain.AgentEgress = lc.GetSinlgeMetric(egressCounter, syslogAgentSID, "", querySumRate)
lc.Metric.Drain.AgentDropped = lc.GetSinlgeMetric(droppedCounter, syslogAgentSID, "", querySumRate)
lc.Metric.Drain.AgentBindings = lc.GetSinlgeMetric(drainsGauge, syslogAgentSID, "", queryMin)
lc.Metric.Drain.AgentActiveDrains = lc.GetSinlgeMetric(drainsActiveGauge, syslogAgentSID, "", queryMin)
lc.Metric.Drain.AgentInvalidDrains = lc.GetSinlgeMetric(drainsInvlaidGauge, syslogAgentSID, "", queryMin)
lc.Metric.Drain.AgentBlacklistedDrains = lc.GetSinlgeMetric(drainsBlacklistedGauge, syslogAgentSID, "", queryMin)
lc.Metric.Drain.AgentNonAppDrains = lc.GetSinlgeMetric(drainsNonAppGauge, syslogAgentSID, "", queryMin)
lc.Metric.Doppler.Ingress = lc.GetSinlgeMetric(ingressCounter, dopplerSID, dopplerJob, querySumRateJob)
lc.Metric.Doppler.IngressDropped = lc.GetSinlgeMetric(droppedCounter, dopplerSID, "", queryIngressMaxOverTime)
lc.Metric.Doppler.Egress = lc.GetSinlgeMetric(egressCounter, dopplerSID, dopplerJob, querySumRateJob)
lc.Metric.Doppler.Dropped = lc.GetSinlgeMetric(droppedCounter, dopplerSID, dopplerJob, querySumRateJob)
lc.Metric.Doppler.Subscriptions = lc.GetSinlgeMetric(subscriptionsGauge, dopplerSID, dopplerJob, querySumJob)
lc.Metric.Metron.Ingress = lc.GetSinlgeMetric(ingressCounter, metronSID, "", querySumRate)
lc.Metric.Metron.Egress = lc.GetSinlgeMetric(egressCounter, metronSID, "", querySumRate)
lc.Metric.Metron.Dropped = lc.GetSinlgeMetric(droppedCounter, metronSID, "", querySumRate)
lc.Metric.RLP.Ingress = lc.GetSinlgeMetric(ingressCounter, rlpSID, tcJob, querySumRateJob)
lc.Metric.RLP.Egress = lc.GetSinlgeMetric(egressCounter, rlpSID, tcJob, querySumRateJob)
lc.Metric.RLP.Dropped = lc.GetSinlgeMetric(droppedCounter, rlpSID, tcJob, querySumRateJob)
lc.Metric.Doppler.MessageRateCapacity = float64(lc.Metric.Doppler.Ingress) / float64(lc.Metric.Doppler.System.Count)
return nil
}
func (lc *LCC) updateQeries(offset, duration string) {
lc.Offset = offset
lc.Duration = duration
queryAvgRateJob = "avg(rate(%s{source_id=\"%s\",job=\"%s\"}[" + lc.Duration + "] offset " + lc.Offset + "))"
queryAvgRate = "avg(rate(%s{source_id=\"%s\"}[" + lc.Duration + "] offset " + lc.Offset + "))"
querySumRateJob = "sum(rate(%s{source_id=\"%s\",job=\"%s\"}[" + lc.Duration + "] offset " + lc.Offset + "))"
querySumRate = "sum(rate(%s{source_id=\"%s\"}[" + lc.Duration + "] offset " + lc.Offset + "))"
querySumJob = "sum(%s{source_id=\"%s\",job=\"%s\"} offset " + lc.Offset + ")"
querySum = "sum(%s{source_id=\"%s\"} offset " + lc.Offset + ")"
queryMin = "min(%s{source_id=\"%s\"} offset " + lc.Offset + ")"
queryMetricJob = "%s{source_id=\"%s\",job=\"%s\"}"
queryIngressMaxOverTime = "sum(max_over_time(%s{source_id=\"%s\", direction=\"ingress\"}[" + lc.Duration + "])) by (index) > 0"
}
// metric helpers
func (lc *LCC) setInstanceCount(system *InstanceMetrics, job string) {
result, err := lc.GetResult(cpuUserGauge, boshSystemMetricsSID, job, queryMetricJob)
if err != nil {
lc.CollectionErrors = append(lc.CollectionErrors, err)
}
sample := result.GetVector().GetSamples()
system.Count = int64(len(sample))
}
func (lc *LCC) getAvgSystemMetrics(system *InstanceMetrics, job string) {
system.CPUUser = lc.GetSinlgeMetric(cpuUserGauge, boshSystemMetricsSID, job, queryAvgRateJob)
system.CPUWait = lc.GetSinlgeMetric(cpuWaitGauge, boshSystemMetricsSID, job, queryAvgRateJob)
system.CPUSys = lc.GetSinlgeMetric(cpuSYSGauge, boshSystemMetricsSID, job, queryAvgRateJob)
system.Memory = lc.GetSinlgeMetric(memoryPercentGauge, boshSystemMetricsSID, job, queryAvgRateJob)
lc.setInstanceCount(system, job)
}
// Lock used to lock when updating/reading metrics
func (lc *LCC) Lock() {
lc.mux.Lock()
}
// Unlock used to unlock after reading/updating metrics
func (lc *LCC) Unlock() {
lc.mux.Unlock()
}
func getSingleSampleResult(sample []*logcache_v1.PromQL_Sample) float64 {
for i := range sample {
return sample[i].GetPoint().GetValue()
}
return 0.0
}