Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
stats worker as metric producer. (#1078)
Browse files Browse the repository at this point in the history
* stats worker as metric producer.

* fixed the conversion based on measure type in addition to aggregation type.
fixed test. Specifically removed json comparision.

* fixed review comments related to count float64.

* add check for metricType in toPoint func for distribution
Also replaced reflect.DeepEqual with cmp.Equal
  • Loading branch information
rghetia authored Mar 27, 2019
1 parent 5ae9166 commit ec71c97
Show file tree
Hide file tree
Showing 6 changed files with 678 additions and 6 deletions.
59 changes: 59 additions & 0 deletions stats/view/aggregation_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AggregationData interface {
addSample(v float64, attachments map[string]interface{}, t time.Time)
clone() AggregationData
equal(other AggregationData) bool
toPoint(t metricdata.Type, time time.Time) metricdata.Point
}

const epsilon = 1e-9
Expand Down Expand Up @@ -61,6 +62,15 @@ func (a *CountData) equal(other AggregationData) bool {
return a.Value == a2.Value
}

func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
switch metricType {
case metricdata.TypeCumulativeInt64:
return metricdata.NewInt64Point(t, a.Value)
default:
panic("unsupported metricdata.Type")
}
}

// SumData is the aggregated data for the Sum aggregation.
// A sum aggregation processes data and sums up the recordings.
//
Expand All @@ -87,6 +97,17 @@ func (a *SumData) equal(other AggregationData) bool {
return math.Pow(a.Value-a2.Value, 2) < epsilon
}

func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
switch metricType {
case metricdata.TypeCumulativeInt64:
return metricdata.NewInt64Point(t, int64(a.Value))
case metricdata.TypeCumulativeFloat64:
return metricdata.NewFloat64Point(t, a.Value)
default:
panic("unsupported metricdata.Type")
}
}

// DistributionData is the aggregated data for the
// Distribution aggregation.
//
Expand Down Expand Up @@ -208,6 +229,33 @@ func (a *DistributionData) equal(other AggregationData) bool {
return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
}

func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
switch metricType {
case metricdata.TypeCumulativeDistribution:
buckets := []metricdata.Bucket{}
for i := 0; i < len(a.CountPerBucket); i++ {
buckets = append(buckets, metricdata.Bucket{
Count: a.CountPerBucket[i],
Exemplar: a.ExemplarsPerBucket[i],
})
}
bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds}

val := &metricdata.Distribution{
Count: a.Count,
Sum: a.Sum(),
SumOfSquaredDeviation: a.SumOfSquaredDev,
BucketOptions: bucketOptions,
Buckets: buckets,
}
return metricdata.NewDistributionPoint(t, val)

default:
// TODO: [rghetia] when we have a use case for TypeGaugeDistribution.
panic("unsupported metricdata.Type")
}
}

// LastValueData returns the last value recorded for LastValue aggregation.
type LastValueData struct {
Value float64
Expand All @@ -232,3 +280,14 @@ func (l *LastValueData) equal(other AggregationData) bool {
}
return l.Value == a2.Value
}

func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
switch metricType {
case metricdata.TypeGaugeInt64:
return metricdata.NewInt64Point(t, int64(l.Value))
case metricdata.TypeGaugeFloat64:
return metricdata.NewFloat64Point(t, l.Value)
default:
panic("unsupported metricdata.Type")
}
}
13 changes: 8 additions & 5 deletions stats/view/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -116,15 +117,17 @@ func dropZeroBounds(bounds ...float64) []float64 {

// viewInternal is the internal representation of a View.
type viewInternal struct {
view *View // view is the canonicalized View definition associated with this view.
subscribed uint32 // 1 if someone is subscribed and data need to be exported, use atomic to access
collector *collector
view *View // view is the canonicalized View definition associated with this view.
subscribed uint32 // 1 if someone is subscribed and data need to be exported, use atomic to access
collector *collector
metricDescriptor *metricdata.Descriptor
}

func newViewInternal(v *View) (*viewInternal, error) {
return &viewInternal{
view: v,
collector: &collector{make(map[string]AggregationData), v.Aggregation},
view: v,
collector: &collector{make(map[string]AggregationData), v.Aggregation},
metricDescriptor: viewToMetricDescriptor(v),
}, nil
}

Expand Down
131 changes: 131 additions & 0 deletions stats/view/view_to_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package view

import (
"time"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats"
)

func getUnit(unit string) metricdata.Unit {
switch unit {
case "1":
return metricdata.UnitDimensionless
case "ms":
return metricdata.UnitMilliseconds
case "By":
return metricdata.UnitBytes
}
return metricdata.UnitDimensionless
}

func getType(v *View) metricdata.Type {
m := v.Measure
agg := v.Aggregation

switch agg.Type {
case AggTypeSum:
switch m.(type) {
case *stats.Int64Measure:
return metricdata.TypeCumulativeInt64
case *stats.Float64Measure:
return metricdata.TypeCumulativeFloat64
default:
panic("unexpected measure type")
}
case AggTypeDistribution:
return metricdata.TypeCumulativeDistribution
case AggTypeLastValue:
switch m.(type) {
case *stats.Int64Measure:
return metricdata.TypeGaugeInt64
case *stats.Float64Measure:
return metricdata.TypeGaugeFloat64
default:
panic("unexpected measure type")
}
case AggTypeCount:
switch m.(type) {
case *stats.Int64Measure:
return metricdata.TypeCumulativeInt64
case *stats.Float64Measure:
return metricdata.TypeCumulativeInt64
default:
panic("unexpected measure type")
}
default:
panic("unexpected aggregation type")
}
}

func getLableKeys(v *View) []string {
labelKeys := []string{}
for _, k := range v.TagKeys {
labelKeys = append(labelKeys, k.Name())
}
return labelKeys
}

func viewToMetricDescriptor(v *View) *metricdata.Descriptor {
return &metricdata.Descriptor{
Name: v.Name,
Description: v.Description,
Unit: getUnit(v.Measure.Unit()),
Type: getType(v),
LabelKeys: getLableKeys(v),
}
}

func toLabelValues(row *Row) []metricdata.LabelValue {
labelValues := []metricdata.LabelValue{}
for _, tag := range row.Tags {
labelValues = append(labelValues, metricdata.NewLabelValue(tag.Value))
}
return labelValues
}

func rowToTimeseries(v *viewInternal, row *Row, now time.Time, startTime time.Time) *metricdata.TimeSeries {
return &metricdata.TimeSeries{
Points: []metricdata.Point{row.Data.toPoint(v.metricDescriptor.Type, now)},
LabelValues: toLabelValues(row),
StartTime: startTime,
}
}

func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricdata.Metric {
if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 ||
v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 {
startTime = time.Time{}
}

rows := v.collectedRows()
if len(rows) == 0 {
return nil
}

ts := []*metricdata.TimeSeries{}
for _, row := range rows {
ts = append(ts, rowToTimeseries(v, row, now, startTime))
}

m := &metricdata.Metric{
Descriptor: *v.metricDescriptor,
TimeSeries: ts,
}
return m
}
Loading

0 comments on commit ec71c97

Please sign in to comment.