diff --git a/go.mod b/go.mod index 872d382..ee8e284 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,9 @@ require ( github.com/smartystreets/assertions v1.0.1 github.com/smartystreets/goconvey v1.6.4 github.com/stretchr/testify v1.7.0 - google.golang.org/grpc v1.40.0 + go.opentelemetry.io/proto/otlp v0.12.0 + google.golang.org/grpc v1.43.0 + google.golang.org/protobuf v1.27.1 ) require ( @@ -39,6 +41,5 @@ require ( golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect golang.org/x/text v0.3.6 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect - google.golang.org/protobuf v1.27.1 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index cf20b2b..1556e12 100644 --- a/go.sum +++ b/go.sum @@ -155,7 +155,11 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/containerd/containerd v1.4.3/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= @@ -224,6 +228,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/esimonov/ifshort v1.0.1/go.mod h1:yZqNJUrNn20K8Q9n2CrjTKYyVEmX209Hgu+M1LBpeZE= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -1120,6 +1125,8 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/collector v0.28.0/go.mod h1:AP/BTXwo1eedoJO7V+HQ68CSvJU1lcdqOzJCgt1VsNs= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.12.0 h1:CMJ/3Wp7iOWES+CYLfnBv+DVmPbB+kmy9PJ92XvlR6c= +go.opentelemetry.io/proto/otlp v0.12.0/go.mod h1:TsIjwGWIx5VFYv9KGVlOpxoBl5Dy+63SUguV7GGvlSQ= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -1623,8 +1630,9 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.43.0 h1:Eeu7bZtDZ2DpRCsLhUlcrLnvYaMK1Gz86a+hMVvELmM= +google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/protocol/otlp/decoder.go b/protocol/otlp/decoder.go new file mode 100644 index 0000000..b6a6720 --- /dev/null +++ b/protocol/otlp/decoder.go @@ -0,0 +1,52 @@ +package otlp + +import ( + "bytes" + "context" + "net/http" + "sync" + + "github.com/signalfx/golib/v3/datapoint/dpsink" + "github.com/signalfx/golib/v3/log" + "github.com/signalfx/ingest-protocols/logkey" + "github.com/signalfx/ingest-protocols/protocol" + "github.com/signalfx/ingest-protocols/protocol/signalfx" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + "google.golang.org/protobuf/proto" +) + +type httpMetricDecoder struct { + sink dpsink.Sink + logger log.Logger + buffs sync.Pool +} + +func NewHTTPMetricDecoder(sink dpsink.Sink, logger log.Logger) signalfx.ErrorReader { + return &httpMetricDecoder{ + sink: sink, + logger: log.NewContext(logger).With(logkey.Protocol, "otlp"), + buffs: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, + } +} + +func (d *httpMetricDecoder) Read(ctx context.Context, req *http.Request) (err error) { + jeff := d.buffs.Get().(*bytes.Buffer) + defer d.buffs.Put(jeff) + jeff.Reset() + if err = protocol.ReadFromRequest(jeff, req, d.logger); err != nil { + return err + } + var msg metricsservicev1.ExportMetricsServiceRequest + if err = proto.Unmarshal(jeff.Bytes(), &msg); err != nil { + return err + } + dps := FromOTLPMetricRequest(&msg) + if len(dps) > 0 { + err = d.sink.AddDatapoints(ctx, dps) + } + return nil +} diff --git a/protocol/otlp/decoder_test.go b/protocol/otlp/decoder_test.go new file mode 100644 index 0000000..c06e02c --- /dev/null +++ b/protocol/otlp/decoder_test.go @@ -0,0 +1,99 @@ +package otlp + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "sync" + "testing" + + "github.com/signalfx/golib/v3/datapoint/dptest" + "github.com/signalfx/golib/v3/log" + . "github.com/smartystreets/goconvey/convey" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" + "google.golang.org/protobuf/proto" +) + +var errReadErr = errors.New("could not read") + +type errorReader struct{} + +func (errorReader *errorReader) Read([]byte) (int, error) { + return 0, errReadErr +} + +func TestDecoder(t *testing.T) { + Convey("httpMetricDecoder", t, func() { + sendTo := dptest.NewBasicSink() + decoder := NewHTTPMetricDecoder(sendTo, log.Discard) + + Convey("Bad request reading", func() { + req := &http.Request{ + Body: io.NopCloser(&errorReader{}), + } + req.ContentLength = 1 + ctx := context.Background() + So(decoder.Read(ctx, req), ShouldEqual, errReadErr) + }) + + Convey("Bad request content", func() { + req := &http.Request{ + Body: io.NopCloser(bytes.NewBufferString("asdf")), + } + req.ContentLength = 4 + ctx := context.Background() + So(decoder.Read(ctx, req), ShouldNotBeNil) + }) + + Convey("Good request", func(c C) { + var msg metricsservicev1.ExportMetricsServiceRequest + msg.ResourceMetrics = []*metricsv1.ResourceMetrics{ + { + InstrumentationLibraryMetrics: []*metricsv1.InstrumentationLibraryMetrics{ + { + Metrics: []*metricsv1.Metric{ + { + Name: "test", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + { + Attributes: []*commonv1.KeyValue{}, + StartTimeUnixNano: 1000, + TimeUnixNano: 1000, + Value: &metricsv1.NumberDataPoint_AsInt{AsInt: 4}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + b, _ := proto.Marshal(&msg) + req := &http.Request{ + Body: io.NopCloser(bytes.NewBuffer(b)), + } + req.ContentLength = int64(len(b)) + ctx := context.Background() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + dp := <-sendTo.PointsChan + c.So(dp, ShouldNotBeNil) + wg.Done() + }() + + So(decoder.Read(ctx, req), ShouldBeNil) + + wg.Wait() + }) + }) +} diff --git a/protocol/otlp/metrics.go b/protocol/otlp/metrics.go new file mode 100644 index 0000000..d98abf1 --- /dev/null +++ b/protocol/otlp/metrics.go @@ -0,0 +1,358 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This code is copied and modified directly from the OTEL Collector: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/1d6309bb62264cc7e2dda076ed95385b1ddef28a/pkg/translator/signalfx/from_metrics.go + +package otlp + +import ( + "encoding/json" + "math" + "strconv" + "time" + + "github.com/signalfx/golib/v3/datapoint" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" +) + +var ( + // Some standard dimension keys. + // upper bound dimension key for histogram buckets. + upperBoundDimensionKey = "upper_bound" + + // infinity bound dimension value is used on all histograms. + infinityBoundSFxDimValue = float64ToDimValue(math.Inf(1)) +) + +// FromOTLPMetricRequest converts the ResourceMetrics in an incoming request to SignalFx datapoints +func FromOTLPMetricRequest(md *metricsservicev1.ExportMetricsServiceRequest) []*datapoint.Datapoint { + return FromOTLPResourceMetrics(md.GetResourceMetrics()) +} + +// FromMetrics converts OTLP ResourceMetrics to SignalFx datapoints. +func FromOTLPResourceMetrics(rms []*metricsv1.ResourceMetrics) []*datapoint.Datapoint { + var sfxDps []*datapoint.Datapoint + + for _, rm := range rms { + for _, ilm := range rm.GetInstrumentationLibraryMetrics() { + for _, m := range ilm.GetMetrics() { + sfxDps = append(sfxDps, FromMetric(m)...) + } + } + + extraDimensions := attributesToDimensions(rm.GetResource().GetAttributes()) + for i := range sfxDps { + dpDims := sfxDps[i].Dimensions + for k, v := range extraDimensions { + if _, ok := dpDims[k]; !ok { + dpDims[k] = v + } + } + } + } + + return sfxDps +} + +// FromMetric converts a OTLP Metric to SignalFx datapoint(s). +func FromMetric(m *metricsv1.Metric) []*datapoint.Datapoint { + var dps []*datapoint.Datapoint + + basePoint := &datapoint.Datapoint{ + Metric: m.GetName(), + MetricType: fromMetricTypeToMetricType(m), + } + + data := m.GetData() + switch data.(type) { + case *metricsv1.Metric_Gauge: + dps = convertNumberDataPoints(m.GetGauge().GetDataPoints(), basePoint) + case *metricsv1.Metric_Sum: + dps = convertNumberDataPoints(m.GetSum().GetDataPoints(), basePoint) + case *metricsv1.Metric_Histogram: + dps = convertHistogram(m.GetHistogram().GetDataPoints(), basePoint) + case *metricsv1.Metric_ExponentialHistogram: + // TODO: Add support for these + case *metricsv1.Metric_Summary: + dps = convertSummaryDataPoints(m.GetSummary().GetDataPoints(), m.GetName()) + } + + return dps +} + +func fromMetricTypeToMetricType(m *metricsv1.Metric) datapoint.MetricType { + data := m.GetData() + switch data.(type) { + case *metricsv1.Metric_Gauge: + return datapoint.Gauge + + case *metricsv1.Metric_Sum: + if !m.GetSum().GetIsMonotonic() { + return datapoint.Gauge + } + if m.GetSum().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + + case *metricsv1.Metric_Histogram: + if m.GetHistogram().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + + case *metricsv1.Metric_Summary: + return datapoint.Counter + } + + return datapoint.Gauge +} + +func convertNumberDataPoints(in []*metricsv1.NumberDataPoint, basePoint *datapoint.Datapoint) []*datapoint.Datapoint { + out := make([]*datapoint.Datapoint, 0, len(in)) + + for _, inDp := range in { + dp := *basePoint + dp.Timestamp = time.Unix(0, int64(inDp.GetTimeUnixNano())) + dp.Dimensions = attributesToDimensions(inDp.GetAttributes()) + + dp.Value = numberToSignalFxValue(inDp) + + out = append(out, &dp) + } + return out +} + +func convertHistogram(histDPs []*metricsv1.HistogramDataPoint, basePoint *datapoint.Datapoint) []*datapoint.Datapoint { + var out []*datapoint.Datapoint + + for _, histDP := range histDPs { + attrDims := attributesToDimensions(histDP.GetAttributes()) + ts := time.Unix(0, int64(histDP.GetTimeUnixNano())) + + countDP := *basePoint + countDP.Metric = basePoint.Metric + "_count" + countDP.Timestamp = ts + countDP.Dimensions = attrDims + count := int64(histDP.GetCount()) + countDP.Value = datapoint.NewIntValue(count) + + sumDP := *basePoint + sumDP.Timestamp = ts + sumDP.Dimensions = attrDims + sum := histDP.GetSum() + sumDP.Value = datapoint.NewFloatValue(sum) + + out = append(out, &countDP, &sumDP) + + bounds := histDP.GetExplicitBounds() + counts := histDP.GetBucketCounts() + + // Spec says counts is optional but if present it must have one more + // element than the bounds array. + if len(counts) > 0 && len(counts) != len(bounds)+1 { + continue + } + + for j, c := range counts { + bound := infinityBoundSFxDimValue + if j < len(bounds) { + bound = float64ToDimValue(bounds[j]) + } + + dp := *basePoint + dp.Metric = basePoint.Metric + "_bucket" + dp.Timestamp = ts + dp.Dimensions = mergeStringMaps(attrDims, map[string]string{ + upperBoundDimensionKey: bound, + }) + dp.Value = datapoint.NewIntValue(int64(c)) + + out = append(out, &dp) + } + } + + return out +} + +func convertSummaryDataPoints( + in []*metricsv1.SummaryDataPoint, + name string, +) []*datapoint.Datapoint { + out := make([]*datapoint.Datapoint, 0, len(in)) + + for _, inDp := range in { + dims := attributesToDimensions(inDp.GetAttributes()) + ts := time.Unix(0, int64(inDp.GetTimeUnixNano())) + + countPt := datapoint.Datapoint{ + Metric: name + "_count", + Timestamp: ts, + Dimensions: dims, + MetricType: datapoint.Counter, + } + c := int64(inDp.GetCount()) + countPt.Value = datapoint.NewIntValue(c) + out = append(out, &countPt) + + sumPt := datapoint.Datapoint{ + Metric: name, + Timestamp: ts, + Dimensions: dims, + MetricType: datapoint.Counter, + } + sum := inDp.GetSum() + sumPt.Value = datapoint.NewFloatValue(sum) + out = append(out, &sumPt) + + qvs := inDp.GetQuantileValues() + for _, qv := range qvs { + qPt := datapoint.Datapoint{ + Metric: name + "_quantile", + Timestamp: ts, + Dimensions: mergeStringMaps(dims, map[string]string{ + "quantile": strconv.FormatFloat(qv.GetQuantile(), 'f', -1, 64), + }), + MetricType: datapoint.Gauge, + } + qPt.Value = datapoint.NewFloatValue(qv.GetValue()) + out = append(out, &qPt) + } + } + return out +} + +func attributesToDimensions(attributes []*commonv1.KeyValue) map[string]string { + dimensions := make(map[string]string, len(attributes)) + if len(attributes) == 0 { + return dimensions + } + for _, kv := range attributes { + v := stringifyAnyValue(kv.GetValue()) + if v == "" { + // Don't bother setting things that serialize to nothing + continue + } + + dimensions[kv.Key] = v + } + return dimensions +} + +func stringifyAnyValue(a *commonv1.AnyValue) string { + var v string + if a == nil { + return "" + } + switch a.GetValue().(type) { + case *commonv1.AnyValue_StringValue: + v = a.GetStringValue() + + case *commonv1.AnyValue_BoolValue: + v = strconv.FormatBool(a.GetBoolValue()) + + case *commonv1.AnyValue_DoubleValue: + v = float64ToDimValue(a.GetDoubleValue()) + + case *commonv1.AnyValue_IntValue: + v = strconv.FormatInt(a.GetIntValue(), 10) + + case *commonv1.AnyValue_KvlistValue, *commonv1.AnyValue_ArrayValue: + jsonStr, _ := json.Marshal(anyValueToRaw(a)) + v = string(jsonStr) + } + + return v +} + +func anyValueToRaw(a *commonv1.AnyValue) interface{} { + var v interface{} + if a == nil { + return nil + } + switch a.GetValue().(type) { + case *commonv1.AnyValue_StringValue: + v = a.GetStringValue() + + case *commonv1.AnyValue_BoolValue: + v = a.GetBoolValue() + + case *commonv1.AnyValue_DoubleValue: + v = a.GetDoubleValue() + + case *commonv1.AnyValue_IntValue: + v = a.GetIntValue() + + case *commonv1.AnyValue_KvlistValue: + kvl := a.GetKvlistValue() + tv := make(map[string]interface{}, len(kvl.Values)) + for _, kv := range kvl.Values { + tv[kv.Key] = anyValueToRaw(kv.Value) + } + v = tv + + case *commonv1.AnyValue_ArrayValue: + av := a.GetArrayValue() + tv := make([]interface{}, len(av.Values)) + for i := range av.Values { + tv[i] = anyValueToRaw(av.Values[i]) + } + v = tv + } + return v +} + +// Is equivalent to strconv.FormatFloat(f, 'g', -1, 64), but hardcodes a few common cases for increased efficiency. +func float64ToDimValue(f float64) string { + // Parameters below are the same used by Prometheus + // see https://github.com/prometheus/common/blob/b5fe7d854c42dc7842e48d1ca58f60feae09d77b/expfmt/text_create.go#L450 + // SignalFx agent uses a different pattern + // https://github.com/signalfx/signalfx-agent/blob/5779a3de0c9861fa07316fd11b3c4ff38c0d78f0/internal/monitors/prometheusexporter/conversion.go#L77 + // The important issue here is consistency with the exporter, opting for the + // more common one used by Prometheus. + switch { + case f == 0: + return "0" + case f == 1: + return "1" + case math.IsInf(f, +1): + return "+Inf" + default: + return strconv.FormatFloat(f, 'g', -1, 64) + } +} + +func numberToSignalFxValue(in *metricsv1.NumberDataPoint) datapoint.Value { + v := in.GetValue() + switch n := v.(type) { + case *metricsv1.NumberDataPoint_AsDouble: + return datapoint.NewFloatValue(n.AsDouble) + case *metricsv1.NumberDataPoint_AsInt: + return datapoint.NewIntValue(n.AsInt) + } + return nil +} + +func mergeStringMaps(ms ...map[string]string) map[string]string { + out := make(map[string]string) + for _, m := range ms { + for k, v := range m { + out[k] = v + } + } + return out +} diff --git a/protocol/otlp/metrics_test.go b/protocol/otlp/metrics_test.go new file mode 100644 index 0000000..8301490 --- /dev/null +++ b/protocol/otlp/metrics_test.go @@ -0,0 +1,791 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "math" + "testing" + "time" + + "github.com/signalfx/golib/v3/datapoint" + . "github.com/smartystreets/goconvey/convey" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" + resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1" +) + +const ( + unixSecs = int64(1574092046) + unixNSecs = int64(11 * time.Millisecond) + tsMSecs = unixSecs*1e3 + unixNSecs/1e6 +) + +var ts = time.Unix(unixSecs, unixNSecs) + +func Test_FromMetrics(t *testing.T) { + labelMap := map[string]string{ + "k0": "v0", + "k1": "v1", + } + + const doubleVal = 1234.5678 + makeDoublePt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: &metricsv1.NumberDataPoint_AsDouble{AsDouble: doubleVal}, + } + } + + makeDoublePtWithLabels := func() *metricsv1.NumberDataPoint { + pt := makeDoublePt() + pt.Attributes = stringMapToAttributeMap(labelMap) + return pt + } + + const int64Val = int64(123) + makeInt64Pt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: &metricsv1.NumberDataPoint_AsInt{AsInt: int64Val}, + } + } + + makeInt64PtWithLabels := func() *metricsv1.NumberDataPoint { + pt := makeInt64Pt() + pt.Attributes = stringMapToAttributeMap(labelMap) + return pt + } + + makeNilValuePt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: nil, + } + } + + histBounds := []float64{1, 2, 4} + histCounts := []uint64{4, 2, 3, 7} + + makeDoubleHistDP := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100.0, + ExplicitBounds: histBounds, + BucketCounts: histCounts, + Attributes: stringMapToAttributeMap(labelMap), + } + } + doubleHistDP := makeDoubleHistDP() + + makeDoubleHistDPBadCounts := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100.0, + ExplicitBounds: histBounds, + BucketCounts: []uint64{4}, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + makeIntHistDP := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100, + ExplicitBounds: histBounds, + BucketCounts: histCounts, + Attributes: stringMapToAttributeMap(labelMap), + } + } + intHistDP := makeIntHistDP() + + makeIntHistDPBadCounts := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100, + ExplicitBounds: histBounds, + BucketCounts: []uint64{4}, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + makeHistDPNoBuckets := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + Count: 2, + Sum: 10, + TimeUnixNano: uint64(ts.UnixNano()), + Attributes: stringMapToAttributeMap(labelMap), + } + } + histDPNoBuckets := makeHistDPNoBuckets() + + const summarySumVal = 123.4 + const summaryCountVal = 111 + + makeSummaryDP := func() *metricsv1.SummaryDataPoint { + summaryDP := &metricsv1.SummaryDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Sum: summarySumVal, + Count: summaryCountVal, + Attributes: stringMapToAttributeMap(labelMap), + } + for i := 0; i < 4; i++ { + summaryDP.QuantileValues = append(summaryDP.QuantileValues, &metricsv1.SummaryDataPoint_ValueAtQuantile{ + Quantile: 0.25 * float64(i+1), + Value: float64(i), + }) + } + return summaryDP + } + + makeEmptySummaryDP := func() *metricsv1.SummaryDataPoint { + return &metricsv1.SummaryDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Sum: summarySumVal, + Count: summaryCountVal, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + tests := []struct { + name string + metricsFn func() []*metricsv1.ResourceMetrics + wantSfxDataPoints []*datapoint.Datapoint + }{ + { + name: "nil_node_nil_resources_no_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_no_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "gauge_int_with_no_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "cumulative_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "cumulative_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "delta_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "delta_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "gauge_sum_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "gauge_sum_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "gauge_sum_int_with_nil_value", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeNilValuePt(), + }, + }, + }, + }, + { + Name: "nil_data", + Data: nil, + }, + } + + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint("gauge_double_with_no_dims", datapoint.Gauge, nil, doubleVal), + int64SFxDataPoint("gauge_int_with_no_dims", datapoint.Gauge, nil, int64Val), + doubleSFxDataPoint("cumulative_double_with_no_dims", datapoint.Counter, nil, doubleVal), + int64SFxDataPoint("cumulative_int_with_no_dims", datapoint.Counter, nil, int64Val), + doubleSFxDataPoint("delta_double_with_no_dims", datapoint.Count, nil, doubleVal), + int64SFxDataPoint("delta_int_with_no_dims", datapoint.Count, nil, int64Val), + doubleSFxDataPoint("gauge_sum_double_with_no_dims", datapoint.Gauge, nil, doubleVal), + int64SFxDataPoint("gauge_sum_int_with_no_dims", datapoint.Gauge, nil, int64Val), + &datapoint.Datapoint{ + Metric: "gauge_sum_int_with_nil_value", + Timestamp: ts, + Value: nil, + MetricType: datapoint.Gauge, + Dimensions: map[string]string{}, + }, + }, + }, + { + name: "nil_node_and_resources_with_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "gauge_int_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + { + Name: "cumulative_double_with_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "cumulative_int_with_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + } + + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint("gauge_double_with_dims", datapoint.Gauge, labelMap, doubleVal), + int64SFxDataPoint("gauge_int_with_dims", datapoint.Gauge, labelMap, int64Val), + doubleSFxDataPoint("cumulative_double_with_dims", datapoint.Counter, labelMap, doubleVal), + int64SFxDataPoint("cumulative_int_with_dims", datapoint.Counter, labelMap, int64Val), + }, + }, + { + name: "with_node_resources_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{ + Resource: &resourcev1.Resource{ + Attributes: []*commonv1.KeyValue{ + { + Key: "k_r0", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_r0"}}, + }, + { + Key: "k_r1", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_r1"}}, + }, + { + Key: "k_n0", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_n0"}}, + }, + { + Key: "k_n1", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_n1"}}, + }, + }, + }, + } + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "gauge_int_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint( + "gauge_double_with_dims", + datapoint.Gauge, + mergeStringMaps(map[string]string{ + "k_n0": "v_n0", + "k_n1": "v_n1", + "k_r0": "v_r0", + "k_r1": "v_r1", + }, labelMap), + doubleVal), + int64SFxDataPoint( + "gauge_int_with_dims", + datapoint.Gauge, + mergeStringMaps(map[string]string{ + "k_n0": "v_n0", + "k_n1": "v_n1", + "k_r0": "v_r0", + "k_r1": "v_r1", + }, labelMap), + int64Val), + }, + }, + { + name: "histograms", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "int_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDP(), + }, + }, + }, + }, + { + Name: "int_delta_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDP(), + }, + }, + }, + }, + { + Name: "double_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDP(), + }, + }, + }, + }, + { + Name: "double_delta_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDP(), + }, + }, + }, + }, + { + Name: "double_histo_bad_counts", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDPBadCounts(), + }, + }, + }, + }, + { + Name: "int_histo_bad_counts", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDPBadCounts(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: mergeDPs( + expectedFromHistogram("int_histo", labelMap, *intHistDP, false), + expectedFromHistogram("int_delta_histo", labelMap, *intHistDP, true), + expectedFromHistogram("double_histo", labelMap, *doubleHistDP, false), + expectedFromHistogram("double_delta_histo", labelMap, *doubleHistDP, true), + []*datapoint.Datapoint{ + int64SFxDataPoint("double_histo_bad_counts_count", datapoint.Counter, labelMap, int64(doubleHistDP.Count)), + doubleSFxDataPoint("double_histo_bad_counts", datapoint.Counter, labelMap, doubleHistDP.Sum), + }, + []*datapoint.Datapoint{ + int64SFxDataPoint("int_histo_bad_counts_count", datapoint.Counter, labelMap, int64(intHistDP.Count)), + doubleSFxDataPoint("int_histo_bad_counts", datapoint.Counter, labelMap, intHistDP.Sum), + }, + ), + }, + { + name: "distribution_no_buckets", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "no_bucket_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeHistDPNoBuckets(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromHistogram("no_bucket_histo", labelMap, *histDPNoBuckets, false), + }, + { + name: "summaries", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "summary", + Data: &metricsv1.Metric_Summary{ + Summary: &metricsv1.Summary{ + DataPoints: []*metricsv1.SummaryDataPoint{ + makeSummaryDP(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromSummary("summary", labelMap, summaryCountVal, summarySumVal), + }, + { + name: "empty_summary", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "empty_summary", + Data: &metricsv1.Metric_Summary{ + Summary: &metricsv1.Summary{ + DataPoints: []*metricsv1.SummaryDataPoint{ + makeEmptySummaryDP(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromEmptySummary("empty_summary", labelMap, summaryCountVal, summarySumVal), + }, + } + for _, tt := range tests { + Convey(tt.name, t, func() { + rms := tt.metricsFn() + gotSfxDataPoints := FromOTLPMetricRequest(&metricsservicev1.ExportMetricsServiceRequest{ResourceMetrics: rms}) + So(tt.wantSfxDataPoints, ShouldResemble, gotSfxDataPoints) + }) + } +} + +func TestAttributesToDimensions(t *testing.T) { + Convey("attributesToDimensions", t, func() { + attrs := []*commonv1.KeyValue{ + { + Key: "a", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "s"}}, + }, + { + Key: "b", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: ""}}, + }, + { + Key: "c", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: true}}, + }, + { + Key: "d", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_IntValue{IntValue: 44}}, + }, + { + Key: "e", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 45.1}}, + }, + { + Key: "f", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_ArrayValue{ArrayValue: &commonv1.ArrayValue{ + Values: []*commonv1.AnyValue{ + {Value: &commonv1.AnyValue_StringValue{StringValue: "n1"}}, + {Value: &commonv1.AnyValue_StringValue{StringValue: "n2"}}, + }}}}, + }, + { + Key: "g", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_KvlistValue{KvlistValue: &commonv1.KeyValueList{ + Values: []*commonv1.KeyValue{ + {Key: "k1", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "n1"}}}, + {Key: "k2", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: false}}}, + {Key: "k3", Value: nil}, + {Key: "k4", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 40.3}}}, + {Key: "k5", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_IntValue{IntValue: 41}}}, + }}}}, + }, + { + Key: "h", + Value: nil, + }, + { + Key: "i", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 0}}, + }, + } + + dims := attributesToDimensions(attrs) + So(dims, ShouldResemble, map[string]string{ + "a": "s", + "c": "true", + "d": "44", + "e": "45.1", + "f": `["n1","n2"]`, + "g": `{"k1":"n1","k2":false,"k3":null,"k4":40.3,"k5":41}`, + "i": "0", + }) + }) +} + +func doubleSFxDataPoint( + metric string, + metricType datapoint.MetricType, + dims map[string]string, + val float64, +) *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: metric, + Timestamp: ts, + Value: datapoint.NewFloatValue(val), + MetricType: metricType, + Dimensions: cloneStringMap(dims), + } +} + +func int64SFxDataPoint( + metric string, + metricType datapoint.MetricType, + dims map[string]string, + val int64, +) *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: metric, + Timestamp: ts, + Value: datapoint.NewIntValue(val), + MetricType: metricType, + Dimensions: cloneStringMap(dims), + } +} + +func expectedFromHistogram( + metricName string, + dims map[string]string, + histDP metricsv1.HistogramDataPoint, + isDelta bool, +) []*datapoint.Datapoint { + buckets := histDP.GetBucketCounts() + + dps := make([]*datapoint.Datapoint, 0) + + typ := datapoint.Counter + if isDelta { + typ = datapoint.Count + } + + dps = append(dps, + int64SFxDataPoint(metricName+"_count", typ, dims, int64(histDP.GetCount())), + doubleSFxDataPoint(metricName, typ, dims, histDP.GetSum())) + + explicitBounds := histDP.GetExplicitBounds() + if explicitBounds == nil { + return dps + } + for i := 0; i < len(explicitBounds); i++ { + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(explicitBounds[i]) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, int64(buckets[i]))) + } + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(math.Inf(1)) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, int64(buckets[len(buckets)-1]))) + return dps +} + +func expectedFromSummary(name string, labelMap map[string]string, count int64, sumVal float64) []*datapoint.Datapoint { + countName := name + "_count" + countPt := int64SFxDataPoint(countName, datapoint.Counter, labelMap, count) + sumPt := doubleSFxDataPoint(name, datapoint.Counter, labelMap, sumVal) + out := []*datapoint.Datapoint{countPt, sumPt} + quantileDimVals := []string{"0.25", "0.5", "0.75", "1"} + for i := 0; i < 4; i++ { + qDims := map[string]string{"quantile": quantileDimVals[i]} + qPt := doubleSFxDataPoint( + name+"_quantile", + datapoint.Gauge, + mergeStringMaps(labelMap, qDims), + float64(i), + ) + out = append(out, qPt) + } + return out +} + +func expectedFromEmptySummary(name string, labelMap map[string]string, count int64, sumVal float64) []*datapoint.Datapoint { + countName := name + "_count" + countPt := int64SFxDataPoint(countName, datapoint.Counter, labelMap, count) + sumPt := doubleSFxDataPoint(name, datapoint.Counter, labelMap, sumVal) + return []*datapoint.Datapoint{countPt, sumPt} +} + +func mergeDPs(dps ...[]*datapoint.Datapoint) []*datapoint.Datapoint { + var out []*datapoint.Datapoint + for i := range dps { + out = append(out, dps[i]...) + } + return out +} + +func cloneStringMap(m map[string]string) map[string]string { + out := make(map[string]string) + for k, v := range m { + out[k] = v + } + return out +} + +func stringMapToAttributeMap(m map[string]string) []*commonv1.KeyValue { + ret := make([]*commonv1.KeyValue, 0, len(m)) + for k, v := range m { + ret = append(ret, &commonv1.KeyValue{ + Key: k, + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: v}}, + }) + } + return ret +} diff --git a/protocol/signalfx/datapoint.go b/protocol/signalfx/datapoint.go index 1406357..6ecc5f6 100644 --- a/protocol/signalfx/datapoint.go +++ b/protocol/signalfx/datapoint.go @@ -25,6 +25,7 @@ import ( "github.com/signalfx/golib/v3/sfxclient" "github.com/signalfx/golib/v3/web" "github.com/signalfx/ingest-protocols/logkey" + "github.com/signalfx/ingest-protocols/protocol" signalfxformat "github.com/signalfx/ingest-protocols/protocol/signalfx/format" ) @@ -138,7 +139,7 @@ func (decoder *ProtobufDecoderV2) Read(ctx context.Context, req *http.Request) ( jeff := buffs.Get().(*bytes.Buffer) defer buffs.Put(jeff) jeff.Reset() - if err = readFromRequest(jeff, req, decoder.Logger); err != nil { + if err = protocol.ReadFromRequest(jeff, req, decoder.Logger); err != nil { return err } var msg sfxmodel.DataPointUploadMessage diff --git a/protocol/signalfx/event.go b/protocol/signalfx/event.go index 6a7e4d3..228df61 100644 --- a/protocol/signalfx/event.go +++ b/protocol/signalfx/event.go @@ -15,6 +15,7 @@ import ( "github.com/signalfx/golib/v3/pointer" "github.com/signalfx/golib/v3/sfxclient" "github.com/signalfx/golib/v3/web" + "github.com/signalfx/ingest-protocols/protocol" signalfxformat "github.com/signalfx/ingest-protocols/protocol/signalfx/format" ) @@ -28,7 +29,7 @@ func (decoder *ProtobufEventDecoderV2) Read(ctx context.Context, req *http.Reque jeff := buffs.Get().(*bytes.Buffer) defer buffs.Put(jeff) jeff.Reset() - if err = readFromRequest(jeff, req, decoder.Logger); err != nil { + if err = protocol.ReadFromRequest(jeff, req, decoder.Logger); err != nil { return err } var msg sfxmodel.EventUploadMessage diff --git a/protocol/signalfx/helper.go b/protocol/util.go similarity index 70% rename from protocol/signalfx/helper.go rename to protocol/util.go index f19aa85..29067ef 100644 --- a/protocol/signalfx/helper.go +++ b/protocol/util.go @@ -1,4 +1,4 @@ -package signalfx +package protocol import ( "bytes" @@ -8,7 +8,8 @@ import ( "github.com/signalfx/ingest-protocols/logkey" ) -func readFromRequest(jeff *bytes.Buffer, req *http.Request, logger log.Logger) error { +// ReadFromRequest reads all the http body into the jeff buffer and logs if there is an error. +func ReadFromRequest(jeff *bytes.Buffer, req *http.Request, logger log.Logger) error { // for compressed transactions, contentLength isn't trustworthy readLen, err := jeff.ReadFrom(req.Body) if err != nil { diff --git a/protocol/util_test.go b/protocol/util_test.go new file mode 100644 index 0000000..f971c69 --- /dev/null +++ b/protocol/util_test.go @@ -0,0 +1,51 @@ +package protocol + +import ( + "bytes" + "errors" + "io" + "net/http" + "testing" + + "github.com/signalfx/golib/v3/log" + . "github.com/smartystreets/goconvey/convey" +) + +var errReadErr = errors.New("could not read") + +type testReader struct { + content []byte + err error +} + +func (tr *testReader) Read(b []byte) (int, error) { + if tr.err != nil { + return 0, tr.err + } + n := copy(b, tr.content) + return n, io.EOF +} + +func TestReadFromRequest(t *testing.T) { + Convey("ReadFromRequest", t, func() { + reader := &testReader{} + var req http.Request + out := bytes.NewBuffer([]byte{}) + + Convey("good read", func() { + reader.content = []byte{0x05} + req.Body = io.NopCloser(reader) + req.ContentLength = 1 + err := ReadFromRequest(out, &req, log.Discard) + So(err, ShouldBeNil) + So(out.Bytes(), ShouldResemble, []byte{0x05}) + }) + + Convey("bad read", func() { + reader.err = errReadErr + req.Body = io.NopCloser(reader) + err := ReadFromRequest(out, &req, log.Discard) + So(err, ShouldEqual, errReadErr) + }) + }) +}