forked from census-instrumentation/opencensus-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add derived gauge example. (census-instrumentation#1110)
* Add derived gauge example. * fix fmt error and unreachable code error. * fix typos.
- Loading branch information
Showing
2 changed files
with
585 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,348 @@ | ||
# Derived Gauge Example | ||
|
||
Table of Contents | ||
================= | ||
- [Summary](#summary) | ||
- [Run the example](#run-the-example) | ||
- [How to use derived gauges?](#how-to-use-derived-gauges-) | ||
* [Initialize Metric Registry](#initialize-metric-registry) | ||
* [Create derived gauge metric](#create-derived-gauge-metric) | ||
* [Create derived gauge entry](#create-derived-gauge-entry) | ||
* [Implement derived gauge interface](#implement-derived-gauge-interface) | ||
* [Complete Example](#complete-example) | ||
|
||
|
||
|
||
## Summary | ||
[top](#Table-of-Contents) | ||
|
||
This example demonstrates the use of derived gauges. It is a simple interactive program of consumer | ||
and producer. User can input number of items to produce. Producer produces specified number of | ||
items. Consumer consumes randomly consumes 1-5 items in each attempt. It then sleeps randomly | ||
between 1-10 seconds before the next attempt. | ||
|
||
There are two metrics collected to monitor the queue. | ||
1. **queue_size**: It is an instantaneous queue size represented using derived gauge int64. | ||
1. **queue_seconds_since_processed_last**: It is the time elaspsed in seconds since the last time | ||
when the queue was consumed. It is represented using derived gauge float64. | ||
This example shows how to use gauge metrics. The program records two gauges. | ||
|
||
These metrics are read when exporter scrapes them. In this example prometheus exporter is used to | ||
scrape the data. Metrics can be viewed at [http://localhost:9090/metrics](http://localhost:9090/metrics) once the program is running. | ||
|
||
Enter different value for number of items to queue and fetch the metrics using above url to see the variation in the metrics. | ||
|
||
## Run the example | ||
|
||
``` | ||
$ go get go.opencensus.io/examples/derived_gauges/... | ||
``` | ||
|
||
then: | ||
|
||
``` | ||
$ go run $(go env GOPATH)/src/go.opencensus.io/examples/derived_gauges/derived_gauge.go | ||
``` | ||
|
||
## How to use derived gauges? | ||
|
||
### Initialize Metric Registry | ||
Create a new metric registry for all your metrics. | ||
This step is a general step for any kind of metrics and not specific to gauges. | ||
Register newly created registry with global producer manager. | ||
|
||
[embedmd]:# (derived_gauge.go reg) | ||
```go | ||
r := metric.NewRegistry() | ||
metricproducer.GlobalManager().AddProducer(r) | ||
``` | ||
|
||
|
||
### Create derived gauge metric | ||
Create a gauge metric. In this example we have two metrics. | ||
|
||
**queue_size** | ||
|
||
[embedmd]:# (derived_gauge.go size) | ||
```go | ||
queueSizeGauge, err := r.AddInt64DerivedGauge( | ||
"queue_size", | ||
metric.WithDescription("Instantaneous queue size"), | ||
metric.WithUnit(metricdata.UnitDimensionless)) | ||
if err != nil { | ||
log.Fatalf("error creating queue size derived gauge, error %v\n", err) | ||
} | ||
``` | ||
|
||
**queue_seconds_since_processed_last** | ||
|
||
[embedmd]:# (derived_gauge.go elapsed) | ||
```go | ||
elapsedSeconds, err := r.AddFloat64DerivedGauge( | ||
"queue_seconds_since_processed_last", | ||
metric.WithDescription("time elapsed since last time the queue was processed"), | ||
metric.WithUnit(metricdata.UnitDimensionless)) | ||
if err != nil { | ||
log.Fatalf("error creating queue_seconds_since_processed_last derived gauge, error %v\n", err) | ||
} | ||
``` | ||
|
||
### Create derived gauge entry | ||
Now, create or insert a unique entry an interface `ToInt64` for a given set of tags. Since we are not using any tags in this example we only insert one entry for each derived gauge metric. | ||
|
||
**insert interface for queue_size** | ||
|
||
[embedmd]:# (derived_gauge.go entrySize) | ||
```go | ||
err = queueSizeGauge.UpsertEntry(q.Size) | ||
if err != nil { | ||
log.Fatalf("error getting queue size derived gauge entry, error %v\n", err) | ||
} | ||
``` | ||
|
||
**insert interface for queue_seconds_since_processed_lasto** | ||
|
||
[embedmd]:# (derived_gauge.go entryElapsed) | ||
```go | ||
err = elapsedSeconds.UpsertEntry(q.Elapsed) | ||
if err != nil { | ||
log.Fatalf("error getting queue_seconds_since_processed_last derived gauge entry, error %v\n", err) | ||
} | ||
``` | ||
|
||
|
||
### Implement derived gauge interface | ||
In order for metrics reader to read the value of your dervied gauge it must | ||
implement ToFloat64 or ToInt64 | ||
|
||
[embedmd]:# (derived_gauge.go toint64) | ||
```go | ||
func (q *queue) Size() int64 { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
return int64(q.size) | ||
} | ||
|
||
``` | ||
|
||
[embedmd]:# (derived_gauge.go tofloat64) | ||
```go | ||
func (q *queue) Elapsed() float64 { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
return time.Now().Sub(q.lastConsumed).Seconds() | ||
} | ||
|
||
``` | ||
|
||
|
||
### Complete Example | ||
|
||
[embedmd]:# (derived_gauge.go entire) | ||
```go | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"math/rand" | ||
"sync" | ||
"time" | ||
|
||
"bufio" | ||
"go.opencensus.io/exporter/prometheus" | ||
"go.opencensus.io/metric" | ||
"go.opencensus.io/metric/metricdata" | ||
"go.opencensus.io/metric/metricproducer" | ||
"net/http" | ||
"os" | ||
"strconv" | ||
"strings" | ||
) | ||
|
||
// This example demonstrates the use of derived gauges. It is a simple interactive program of consumer | ||
// and producer. User can input number of items to produce. Producer produces specified number of | ||
// items. Consumer consumes randomly consumes 1-5 items in each attempt. It then sleeps randomly | ||
// between 1-10 seconds before the next attempt. | ||
// | ||
// There are two metrics collected to monitor the queue. | ||
// 1. queue_size: It is an instantaneous queue size represented using derived gauge int64. | ||
// 2. queue_seconds_since_processed_last: It is the time elaspsed in seconds since the last time | ||
// when the queue was consumed. It is represented using derived gauge float64. | ||
type queue struct { | ||
size int | ||
q []int | ||
lastConsumed time.Time | ||
mu sync.Mutex | ||
} | ||
|
||
var q = &queue{} | ||
|
||
const ( | ||
maxItemsToConsumePerAttempt = 25 | ||
) | ||
|
||
func init() { | ||
q.q = make([]int, 100) | ||
} | ||
|
||
// consume randomly dequeues upto 5 items from the queue | ||
func (q *queue) consume() { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
|
||
consumeCount := rand.Int() % maxItemsToConsumePerAttempt | ||
i := 0 | ||
for i = 0; i < consumeCount; i++ { | ||
if q.size > 0 { | ||
q.q = q.q[1:] | ||
q.size-- | ||
} else { | ||
break | ||
} | ||
} | ||
if i > 0 { | ||
q.lastConsumed = time.Now() | ||
} | ||
} | ||
|
||
// produce randomly enqueues upto 5 items from the queue | ||
func (q *queue) produce(count int) { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
|
||
for i := 0; i < count; i++ { | ||
v := rand.Int() % 100 | ||
q.q = append(q.q, v) | ||
q.size++ | ||
} | ||
fmt.Printf("queued %d items, queue size is %d\n", count, q.size) | ||
} | ||
|
||
func (q *queue) runConsumer(interval int, cQuit chan bool) { | ||
t := time.NewTicker(time.Duration(interval) * time.Second) | ||
for { | ||
select { | ||
case <-t.C: | ||
q.consume() | ||
case <-cQuit: | ||
t.Stop() | ||
return | ||
} | ||
} | ||
} | ||
|
||
// Size reports instantaneous queue size. | ||
// This is the interface supplied while creating an entry for derived gauge int64. | ||
func (q *queue) Size() int64 { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
return int64(q.size) | ||
} | ||
|
||
|
||
// Elapsed reports time elapsed since the last time an item was consumed from the queue. | ||
// This is the interface supplied while creating an entry for derived gauge float64. | ||
func (q *queue) Elapsed() float64 { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
return time.Now().Sub(q.lastConsumed).Seconds() | ||
} | ||
|
||
|
||
func getInput() int { | ||
reader := bufio.NewReader(os.Stdin) | ||
limit := 100 | ||
for { | ||
fmt.Printf("Enter number of items to put in consumer queue? [1-%d]: ", limit) | ||
text, _ := reader.ReadString('\n') | ||
count, err := strconv.Atoi(strings.TrimSuffix(text, "\n")) | ||
if err == nil { | ||
if count < 1 || count > limit { | ||
fmt.Printf("invalid value %s\n", text) | ||
continue | ||
} | ||
return count | ||
} | ||
fmt.Printf("error %v\n", err) | ||
} | ||
} | ||
|
||
func doWork() { | ||
fmt.Printf("Program monitors queue using two derived gauge metrics.\n") | ||
fmt.Printf(" 1. queue_size = the instantaneous size of the queue.\n") | ||
fmt.Printf(" 2. queue_seconds_since_processed_last = the number of seconds elapsed since last time the queue was processed.\n") | ||
fmt.Printf("Go to http://localhost:9090/metrics to see the metrics.\n\n\n") | ||
|
||
// Take a number of items to queue as an input from the user | ||
// and enqueue the same number of items on to the consumer queue. | ||
for { | ||
count := getInput() | ||
q.produce(count) | ||
fmt.Printf("press CTRL+C to terminate the program\n") | ||
} | ||
} | ||
|
||
func createAndStartExporter() { | ||
// Create Prometheus metrics exporter to verify derived gauge metrics in this example. | ||
exporter, err := prometheus.NewExporter(prometheus.Options{}) | ||
if err != nil { | ||
log.Fatalf("Failed to create the prometheus metrics exporter: %v", err) | ||
} | ||
http.Handle("/metrics", exporter) | ||
go func() { | ||
log.Fatal(http.ListenAndServe(":9090", nil)) | ||
|
||
}() | ||
} | ||
|
||
func main() { | ||
createAndStartExporter() | ||
|
||
// Create metric registry and register it with global producer manager. | ||
r := metric.NewRegistry() | ||
metricproducer.GlobalManager().AddProducer(r) | ||
|
||
// Create Int64DerviedGauge | ||
queueSizeGauge, err := r.AddInt64DerivedGauge( | ||
"queue_size", | ||
metric.WithDescription("Instantaneous queue size"), | ||
metric.WithUnit(metricdata.UnitDimensionless)) | ||
if err != nil { | ||
log.Fatalf("error creating queue size derived gauge, error %v\n", err) | ||
} | ||
|
||
err = queueSizeGauge.UpsertEntry(q.Size) | ||
if err != nil { | ||
log.Fatalf("error getting queue size derived gauge entry, error %v\n", err) | ||
} | ||
|
||
// Create Float64DerviedGauge | ||
elapsedSeconds, err := r.AddFloat64DerivedGauge( | ||
"queue_seconds_since_processed_last", | ||
metric.WithDescription("time elapsed since last time the queue was processed"), | ||
metric.WithUnit(metricdata.UnitDimensionless)) | ||
if err != nil { | ||
log.Fatalf("error creating queue_seconds_since_processed_last derived gauge, error %v\n", err) | ||
} | ||
|
||
err = elapsedSeconds.UpsertEntry(q.Elapsed) | ||
if err != nil { | ||
log.Fatalf("error getting queue_seconds_since_processed_last derived gauge entry, error %v\n", err) | ||
} | ||
|
||
cQuit := make(chan bool) | ||
defer func() { | ||
cQuit <- true | ||
close(cQuit) | ||
}() | ||
|
||
// Run consumer and producer | ||
go q.runConsumer(5, cQuit) | ||
|
||
for { | ||
doWork() | ||
} | ||
} | ||
|
||
``` |
Oops, something went wrong.