Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Exemplar: Use generic interface for attachment values. (#1070)
Browse files Browse the repository at this point in the history
  • Loading branch information
songy23 authored Mar 20, 2019
1 parent ebb7978 commit 8a36f74
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 34 deletions.
2 changes: 1 addition & 1 deletion metric/metricdata/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ type Exemplar struct {
}

// Attachments is a map of extra values associated with a recorded data point.
type Attachments map[string]string
type Attachments map[string]interface{}
2 changes: 1 addition & 1 deletion stats/internal/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// DefaultRecorder will be called for each Record call.
var DefaultRecorder func(tags *tag.Map, measurement interface{}, attachments map[string]string)
var DefaultRecorder func(tags *tag.Map, measurement interface{}, attachments map[string]interface{})

// SubscriptionReporter reports when a view subscribed with a measure.
var SubscriptionReporter func(measure string)
2 changes: 1 addition & 1 deletion stats/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func Record(ctx context.Context, ms ...Measurement) {
return
}
// TODO(songy23): fix attachments.
recorder(tag.FromContext(ctx), ms, map[string]string{})
recorder(tag.FromContext(ctx), ms, map[string]interface{}{})
}

// RecordWithTags records one or multiple measurements at once.
Expand Down
38 changes: 28 additions & 10 deletions stats/view/aggregation_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package view

import (
"math"
"time"

"go.opencensus.io/metric/metricdata"
)
Expand All @@ -26,7 +27,7 @@ import (
// Mosts users won't directly access aggregration data.
type AggregationData interface {
isAggregationData() bool
addSample(v float64)
addSample(v float64, attachments map[string]interface{}, t time.Time)
clone() AggregationData
equal(other AggregationData) bool
}
Expand All @@ -43,7 +44,7 @@ type CountData struct {

func (a *CountData) isAggregationData() bool { return true }

func (a *CountData) addSample(_ float64) {
func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
a.Value = a.Value + 1
}

Expand All @@ -70,7 +71,7 @@ type SumData struct {

func (a *SumData) isAggregationData() bool { return true }

func (a *SumData) addSample(v float64) {
func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
a.Value += v
}

Expand Down Expand Up @@ -101,7 +102,7 @@ type DistributionData struct {
SumOfSquaredDev float64 // sum of the squared deviation from the mean
CountPerBucket []int64 // number of occurrences per bucket
// ExemplarsPerBucket is slice the same length as CountPerBucket containing
// an metricdata for the associated bucket, or nil.
// an exemplar for the associated bucket, or nil.
ExemplarsPerBucket []*metricdata.Exemplar
bounds []float64 // histogram distribution of the values
}
Expand Down Expand Up @@ -130,15 +131,15 @@ func (a *DistributionData) variance() float64 {
func (a *DistributionData) isAggregationData() bool { return true }

// TODO(songy23): support exemplar attachments.
func (a *DistributionData) addSample(v float64) {
func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
if v < a.Min {
a.Min = v
}
if v > a.Max {
a.Max = v
}
a.Count++
a.addToBucket(v)
a.addToBucket(v, attachments, t)

if a.Count == 1 {
a.Mean = v
Expand All @@ -150,18 +151,35 @@ func (a *DistributionData) addSample(v float64) {
a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
}

func (a *DistributionData) addToBucket(v float64) {
func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
var count *int64
for i, b := range a.bounds {
var i int
var b float64
for i, b = range a.bounds {
if v < b {
count = &a.CountPerBucket[i]
break
}
}
if count == nil { // Last bucket.
count = &a.CountPerBucket[len(a.bounds)]
i = len(a.bounds)
count = &a.CountPerBucket[i]
}
*count++
if exemplar := getExemplar(v, attachments, t); exemplar != nil {
a.ExemplarsPerBucket[i] = exemplar
}
}

func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
if len(attachments) == 0 {
return nil
}
return &metricdata.Exemplar{
Value: v,
Timestamp: t,
Attachments: attachments,
}
}

func (a *DistributionData) clone() AggregationData {
Expand Down Expand Up @@ -199,7 +217,7 @@ func (l *LastValueData) isAggregationData() bool {
return true
}

func (l *LastValueData) addSample(v float64) {
func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
l.Value = v
}

Expand Down
34 changes: 22 additions & 12 deletions stats/view/aggregation_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package view
import (
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -66,12 +67,15 @@ func TestDataClone(t *testing.T) {

func TestDistributionData_addSample(t *testing.T) {
dd := newDistributionData([]float64{1, 2})
dd.addSample(0.5)
attachments1 := map[string]interface{}{"key1": "value1"}
t1 := time.Now()
dd.addSample(0.5, attachments1, t1)

e1 := &metricdata.Exemplar{Value: 0.5, Timestamp: t1, Attachments: attachments1}
want := &DistributionData{
Count: 1,
CountPerBucket: []int64{1, 0, 0},
ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil, nil},
ExemplarsPerBucket: []*metricdata.Exemplar{e1, nil, nil},
Max: 0.5,
Min: 0.5,
Mean: 0.5,
Expand All @@ -81,13 +85,16 @@ func TestDistributionData_addSample(t *testing.T) {
t.Fatalf("Unexpected DistributionData -got +want: %s", diff)
}

dd.addSample(0.7)
attachments2 := map[string]interface{}{"key2": "value2"}
t2 := t1.Add(time.Microsecond)
dd.addSample(0.7, attachments2, t2)

// Previous exemplar should be preserved, since it has more annotations.
// Previous exemplar should be overwritten.
e2 := &metricdata.Exemplar{Value: 0.7, Timestamp: t2, Attachments: attachments2}
want = &DistributionData{
Count: 2,
CountPerBucket: []int64{2, 0, 0},
ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil, nil},
ExemplarsPerBucket: []*metricdata.Exemplar{e2, nil, nil},
Max: 0.7,
Min: 0.5,
Mean: 0.6,
Expand All @@ -97,16 +104,19 @@ func TestDistributionData_addSample(t *testing.T) {
t.Fatalf("Unexpected DistributionData -got +want: %s", diff)
}

dd.addSample(0.2)
attachments3 := map[string]interface{}{"key3": "value3"}
t3 := t2.Add(time.Microsecond)
dd.addSample(1.2, attachments3, t3)

// Exemplar should be replaced since it has a trace_id.
// e3 is at another bucket. e2 should still be there.
e3 := &metricdata.Exemplar{Value: 1.2, Timestamp: t3, Attachments: attachments3}
want = &DistributionData{
Count: 3,
CountPerBucket: []int64{3, 0, 0},
ExemplarsPerBucket: []*metricdata.Exemplar{nil, nil, nil},
Max: 0.7,
Min: 0.2,
Mean: 0.4666666666666667,
CountPerBucket: []int64{2, 1, 0},
ExemplarsPerBucket: []*metricdata.Exemplar{e2, e3, nil},
Max: 1.2,
Min: 0.5,
Mean: 0.7999999999999999,
SumOfSquaredDev: 0,
}
if diff := cmpDD(dd, want); diff != "" {
Expand Down
5 changes: 3 additions & 2 deletions stats/view/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package view

import (
"sort"
"time"

"go.opencensus.io/internal/tagencoding"
"go.opencensus.io/tag"
Expand All @@ -31,13 +32,13 @@ type collector struct {
a *Aggregation
}

func (c *collector) addSample(s string, v float64) {
func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) {
aggregator, ok := c.signatures[s]
if !ok {
aggregator = c.a.newData()
c.signatures[s] = aggregator
}
aggregator.addSample(v)
aggregator.addSample(v, attachments, t)
}

// collectRows returns a snapshot of the collected Row values.
Expand Down
4 changes: 2 additions & 2 deletions stats/view/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ func (v *viewInternal) collectedRows() []*Row {
return v.collector.collectedRows(v.view.TagKeys)
}

func (v *viewInternal) addSample(m *tag.Map, val float64) {
func (v *viewInternal) addSample(m *tag.Map, val float64, attachments map[string]interface{}, t time.Time) {
if !v.isSubscribed() {
return
}
sig := string(encodeWithKeys(m, v.view.TagKeys))
v.collector.addSample(sig, val)
v.collector.addSample(sig, val, attachments, t)
}

// A Data is a set of rows about usage of the single measure associated
Expand Down
5 changes: 3 additions & 2 deletions stats/view/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package view
import (
"context"
"testing"
"time"

"github.com/google/go-cmp/cmp"

Expand Down Expand Up @@ -177,7 +178,7 @@ func Test_View_MeasureFloat64_AggregationDistribution(t *testing.T) {
if err != nil {
t.Errorf("%v: New = %v", tc.label, err)
}
view.addSample(tag.FromContext(ctx), r.f)
view.addSample(tag.FromContext(ctx), r.f, nil, time.Now())
}

gotRows := view.collectedRows()
Expand Down Expand Up @@ -293,7 +294,7 @@ func Test_View_MeasureFloat64_AggregationSum(t *testing.T) {
if err != nil {
t.Errorf("%v: New = %v", tt.label, err)
}
view.addSample(tag.FromContext(ctx), r.f)
view.addSample(tag.FromContext(ctx), r.f, nil, time.Now())
}

gotRows := view.collectedRows()
Expand Down
2 changes: 1 addition & 1 deletion stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func RetrieveData(viewName string) ([]*Row, error) {
return resp.rows, resp.err
}

func record(tags *tag.Map, ms interface{}, attachments map[string]string) {
func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
req := &recordReq{
tm: tags,
ms: ms.([]stats.Measurement),
Expand Down
4 changes: 2 additions & 2 deletions stats/view/worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (cmd *retrieveDataReq) handleCommand(w *worker) {
type recordReq struct {
tm *tag.Map
ms []stats.Measurement
attachments map[string]string
attachments map[string]interface{}
t time.Time
}

Expand All @@ -159,7 +159,7 @@ func (cmd *recordReq) handleCommand(w *worker) {
}
ref := w.getMeasureRef(m.Measure().Name())
for v := range ref.views {
v.addSample(cmd.tm, m.Value())
v.addSample(cmd.tm, m.Value(), cmd.attachments, time.Now())
}
}
}
Expand Down

0 comments on commit 8a36f74

Please sign in to comment.