From 3989fb6d94479ec8ae2757f3b8e7efd86aa5c564 Mon Sep 17 00:00:00 2001 From: Ben Keith Date: Fri, 18 Feb 2022 13:53:53 -0500 Subject: [PATCH] Add OTLP translation (#36) This adds the protocol/otlp package that can translate from OTLP metric types to SignalFx datapoints. It follows the same conversion process as the OTEL collector, but that code cannot be reused due to the internal format used within the OTEL Collector. This also adds a decoder that reads from an HTTP request and sends to a sink. --- go.mod | 5 +- go.sum | 10 +- protocol/otlp/decoder.go | 52 ++ protocol/otlp/decoder_test.go | 99 +++ protocol/otlp/metrics.go | 358 ++++++++++ protocol/otlp/metrics_test.go | 791 +++++++++++++++++++++++ protocol/signalfx/datapoint.go | 3 +- protocol/signalfx/event.go | 3 +- protocol/{signalfx/helper.go => util.go} | 5 +- protocol/util_test.go | 51 ++ 10 files changed, 1370 insertions(+), 7 deletions(-) create mode 100644 protocol/otlp/decoder.go create mode 100644 protocol/otlp/decoder_test.go create mode 100644 protocol/otlp/metrics.go create mode 100644 protocol/otlp/metrics_test.go rename protocol/{signalfx/helper.go => util.go} (70%) create mode 100644 protocol/util_test.go diff --git a/go.mod b/go.mod index 872d382..ee8e284 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,9 @@ require ( github.com/smartystreets/assertions v1.0.1 github.com/smartystreets/goconvey v1.6.4 github.com/stretchr/testify v1.7.0 - google.golang.org/grpc v1.40.0 + go.opentelemetry.io/proto/otlp v0.12.0 + google.golang.org/grpc v1.43.0 + google.golang.org/protobuf v1.27.1 ) require ( @@ -39,6 +41,5 @@ require ( golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect golang.org/x/text v0.3.6 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect - google.golang.org/protobuf v1.27.1 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index cf20b2b..1556e12 100644 --- a/go.sum +++ b/go.sum @@ -155,7 +155,11 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/containerd/containerd v1.4.3/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= @@ -224,6 +228,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/esimonov/ifshort v1.0.1/go.mod h1:yZqNJUrNn20K8Q9n2CrjTKYyVEmX209Hgu+M1LBpeZE= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -1120,6 +1125,8 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/collector v0.28.0/go.mod h1:AP/BTXwo1eedoJO7V+HQ68CSvJU1lcdqOzJCgt1VsNs= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.12.0 h1:CMJ/3Wp7iOWES+CYLfnBv+DVmPbB+kmy9PJ92XvlR6c= +go.opentelemetry.io/proto/otlp v0.12.0/go.mod h1:TsIjwGWIx5VFYv9KGVlOpxoBl5Dy+63SUguV7GGvlSQ= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -1623,8 +1630,9 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.43.0 h1:Eeu7bZtDZ2DpRCsLhUlcrLnvYaMK1Gz86a+hMVvELmM= +google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/protocol/otlp/decoder.go b/protocol/otlp/decoder.go new file mode 100644 index 0000000..b6a6720 --- /dev/null +++ b/protocol/otlp/decoder.go @@ -0,0 +1,52 @@ +package otlp + +import ( + "bytes" + "context" + "net/http" + "sync" + + "github.com/signalfx/golib/v3/datapoint/dpsink" + "github.com/signalfx/golib/v3/log" + "github.com/signalfx/ingest-protocols/logkey" + "github.com/signalfx/ingest-protocols/protocol" + "github.com/signalfx/ingest-protocols/protocol/signalfx" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + "google.golang.org/protobuf/proto" +) + +type httpMetricDecoder struct { + sink dpsink.Sink + logger log.Logger + buffs sync.Pool +} + +func NewHTTPMetricDecoder(sink dpsink.Sink, logger log.Logger) signalfx.ErrorReader { + return &httpMetricDecoder{ + sink: sink, + logger: log.NewContext(logger).With(logkey.Protocol, "otlp"), + buffs: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, + } +} + +func (d *httpMetricDecoder) Read(ctx context.Context, req *http.Request) (err error) { + jeff := d.buffs.Get().(*bytes.Buffer) + defer d.buffs.Put(jeff) + jeff.Reset() + if err = protocol.ReadFromRequest(jeff, req, d.logger); err != nil { + return err + } + var msg metricsservicev1.ExportMetricsServiceRequest + if err = proto.Unmarshal(jeff.Bytes(), &msg); err != nil { + return err + } + dps := FromOTLPMetricRequest(&msg) + if len(dps) > 0 { + err = d.sink.AddDatapoints(ctx, dps) + } + return nil +} diff --git a/protocol/otlp/decoder_test.go b/protocol/otlp/decoder_test.go new file mode 100644 index 0000000..c06e02c --- /dev/null +++ b/protocol/otlp/decoder_test.go @@ -0,0 +1,99 @@ +package otlp + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "sync" + "testing" + + "github.com/signalfx/golib/v3/datapoint/dptest" + "github.com/signalfx/golib/v3/log" + . "github.com/smartystreets/goconvey/convey" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" + "google.golang.org/protobuf/proto" +) + +var errReadErr = errors.New("could not read") + +type errorReader struct{} + +func (errorReader *errorReader) Read([]byte) (int, error) { + return 0, errReadErr +} + +func TestDecoder(t *testing.T) { + Convey("httpMetricDecoder", t, func() { + sendTo := dptest.NewBasicSink() + decoder := NewHTTPMetricDecoder(sendTo, log.Discard) + + Convey("Bad request reading", func() { + req := &http.Request{ + Body: io.NopCloser(&errorReader{}), + } + req.ContentLength = 1 + ctx := context.Background() + So(decoder.Read(ctx, req), ShouldEqual, errReadErr) + }) + + Convey("Bad request content", func() { + req := &http.Request{ + Body: io.NopCloser(bytes.NewBufferString("asdf")), + } + req.ContentLength = 4 + ctx := context.Background() + So(decoder.Read(ctx, req), ShouldNotBeNil) + }) + + Convey("Good request", func(c C) { + var msg metricsservicev1.ExportMetricsServiceRequest + msg.ResourceMetrics = []*metricsv1.ResourceMetrics{ + { + InstrumentationLibraryMetrics: []*metricsv1.InstrumentationLibraryMetrics{ + { + Metrics: []*metricsv1.Metric{ + { + Name: "test", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + { + Attributes: []*commonv1.KeyValue{}, + StartTimeUnixNano: 1000, + TimeUnixNano: 1000, + Value: &metricsv1.NumberDataPoint_AsInt{AsInt: 4}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + b, _ := proto.Marshal(&msg) + req := &http.Request{ + Body: io.NopCloser(bytes.NewBuffer(b)), + } + req.ContentLength = int64(len(b)) + ctx := context.Background() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + dp := <-sendTo.PointsChan + c.So(dp, ShouldNotBeNil) + wg.Done() + }() + + So(decoder.Read(ctx, req), ShouldBeNil) + + wg.Wait() + }) + }) +} diff --git a/protocol/otlp/metrics.go b/protocol/otlp/metrics.go new file mode 100644 index 0000000..d98abf1 --- /dev/null +++ b/protocol/otlp/metrics.go @@ -0,0 +1,358 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This code is copied and modified directly from the OTEL Collector: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/1d6309bb62264cc7e2dda076ed95385b1ddef28a/pkg/translator/signalfx/from_metrics.go + +package otlp + +import ( + "encoding/json" + "math" + "strconv" + "time" + + "github.com/signalfx/golib/v3/datapoint" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" +) + +var ( + // Some standard dimension keys. + // upper bound dimension key for histogram buckets. + upperBoundDimensionKey = "upper_bound" + + // infinity bound dimension value is used on all histograms. + infinityBoundSFxDimValue = float64ToDimValue(math.Inf(1)) +) + +// FromOTLPMetricRequest converts the ResourceMetrics in an incoming request to SignalFx datapoints +func FromOTLPMetricRequest(md *metricsservicev1.ExportMetricsServiceRequest) []*datapoint.Datapoint { + return FromOTLPResourceMetrics(md.GetResourceMetrics()) +} + +// FromMetrics converts OTLP ResourceMetrics to SignalFx datapoints. +func FromOTLPResourceMetrics(rms []*metricsv1.ResourceMetrics) []*datapoint.Datapoint { + var sfxDps []*datapoint.Datapoint + + for _, rm := range rms { + for _, ilm := range rm.GetInstrumentationLibraryMetrics() { + for _, m := range ilm.GetMetrics() { + sfxDps = append(sfxDps, FromMetric(m)...) + } + } + + extraDimensions := attributesToDimensions(rm.GetResource().GetAttributes()) + for i := range sfxDps { + dpDims := sfxDps[i].Dimensions + for k, v := range extraDimensions { + if _, ok := dpDims[k]; !ok { + dpDims[k] = v + } + } + } + } + + return sfxDps +} + +// FromMetric converts a OTLP Metric to SignalFx datapoint(s). +func FromMetric(m *metricsv1.Metric) []*datapoint.Datapoint { + var dps []*datapoint.Datapoint + + basePoint := &datapoint.Datapoint{ + Metric: m.GetName(), + MetricType: fromMetricTypeToMetricType(m), + } + + data := m.GetData() + switch data.(type) { + case *metricsv1.Metric_Gauge: + dps = convertNumberDataPoints(m.GetGauge().GetDataPoints(), basePoint) + case *metricsv1.Metric_Sum: + dps = convertNumberDataPoints(m.GetSum().GetDataPoints(), basePoint) + case *metricsv1.Metric_Histogram: + dps = convertHistogram(m.GetHistogram().GetDataPoints(), basePoint) + case *metricsv1.Metric_ExponentialHistogram: + // TODO: Add support for these + case *metricsv1.Metric_Summary: + dps = convertSummaryDataPoints(m.GetSummary().GetDataPoints(), m.GetName()) + } + + return dps +} + +func fromMetricTypeToMetricType(m *metricsv1.Metric) datapoint.MetricType { + data := m.GetData() + switch data.(type) { + case *metricsv1.Metric_Gauge: + return datapoint.Gauge + + case *metricsv1.Metric_Sum: + if !m.GetSum().GetIsMonotonic() { + return datapoint.Gauge + } + if m.GetSum().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + + case *metricsv1.Metric_Histogram: + if m.GetHistogram().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + + case *metricsv1.Metric_Summary: + return datapoint.Counter + } + + return datapoint.Gauge +} + +func convertNumberDataPoints(in []*metricsv1.NumberDataPoint, basePoint *datapoint.Datapoint) []*datapoint.Datapoint { + out := make([]*datapoint.Datapoint, 0, len(in)) + + for _, inDp := range in { + dp := *basePoint + dp.Timestamp = time.Unix(0, int64(inDp.GetTimeUnixNano())) + dp.Dimensions = attributesToDimensions(inDp.GetAttributes()) + + dp.Value = numberToSignalFxValue(inDp) + + out = append(out, &dp) + } + return out +} + +func convertHistogram(histDPs []*metricsv1.HistogramDataPoint, basePoint *datapoint.Datapoint) []*datapoint.Datapoint { + var out []*datapoint.Datapoint + + for _, histDP := range histDPs { + attrDims := attributesToDimensions(histDP.GetAttributes()) + ts := time.Unix(0, int64(histDP.GetTimeUnixNano())) + + countDP := *basePoint + countDP.Metric = basePoint.Metric + "_count" + countDP.Timestamp = ts + countDP.Dimensions = attrDims + count := int64(histDP.GetCount()) + countDP.Value = datapoint.NewIntValue(count) + + sumDP := *basePoint + sumDP.Timestamp = ts + sumDP.Dimensions = attrDims + sum := histDP.GetSum() + sumDP.Value = datapoint.NewFloatValue(sum) + + out = append(out, &countDP, &sumDP) + + bounds := histDP.GetExplicitBounds() + counts := histDP.GetBucketCounts() + + // Spec says counts is optional but if present it must have one more + // element than the bounds array. + if len(counts) > 0 && len(counts) != len(bounds)+1 { + continue + } + + for j, c := range counts { + bound := infinityBoundSFxDimValue + if j < len(bounds) { + bound = float64ToDimValue(bounds[j]) + } + + dp := *basePoint + dp.Metric = basePoint.Metric + "_bucket" + dp.Timestamp = ts + dp.Dimensions = mergeStringMaps(attrDims, map[string]string{ + upperBoundDimensionKey: bound, + }) + dp.Value = datapoint.NewIntValue(int64(c)) + + out = append(out, &dp) + } + } + + return out +} + +func convertSummaryDataPoints( + in []*metricsv1.SummaryDataPoint, + name string, +) []*datapoint.Datapoint { + out := make([]*datapoint.Datapoint, 0, len(in)) + + for _, inDp := range in { + dims := attributesToDimensions(inDp.GetAttributes()) + ts := time.Unix(0, int64(inDp.GetTimeUnixNano())) + + countPt := datapoint.Datapoint{ + Metric: name + "_count", + Timestamp: ts, + Dimensions: dims, + MetricType: datapoint.Counter, + } + c := int64(inDp.GetCount()) + countPt.Value = datapoint.NewIntValue(c) + out = append(out, &countPt) + + sumPt := datapoint.Datapoint{ + Metric: name, + Timestamp: ts, + Dimensions: dims, + MetricType: datapoint.Counter, + } + sum := inDp.GetSum() + sumPt.Value = datapoint.NewFloatValue(sum) + out = append(out, &sumPt) + + qvs := inDp.GetQuantileValues() + for _, qv := range qvs { + qPt := datapoint.Datapoint{ + Metric: name + "_quantile", + Timestamp: ts, + Dimensions: mergeStringMaps(dims, map[string]string{ + "quantile": strconv.FormatFloat(qv.GetQuantile(), 'f', -1, 64), + }), + MetricType: datapoint.Gauge, + } + qPt.Value = datapoint.NewFloatValue(qv.GetValue()) + out = append(out, &qPt) + } + } + return out +} + +func attributesToDimensions(attributes []*commonv1.KeyValue) map[string]string { + dimensions := make(map[string]string, len(attributes)) + if len(attributes) == 0 { + return dimensions + } + for _, kv := range attributes { + v := stringifyAnyValue(kv.GetValue()) + if v == "" { + // Don't bother setting things that serialize to nothing + continue + } + + dimensions[kv.Key] = v + } + return dimensions +} + +func stringifyAnyValue(a *commonv1.AnyValue) string { + var v string + if a == nil { + return "" + } + switch a.GetValue().(type) { + case *commonv1.AnyValue_StringValue: + v = a.GetStringValue() + + case *commonv1.AnyValue_BoolValue: + v = strconv.FormatBool(a.GetBoolValue()) + + case *commonv1.AnyValue_DoubleValue: + v = float64ToDimValue(a.GetDoubleValue()) + + case *commonv1.AnyValue_IntValue: + v = strconv.FormatInt(a.GetIntValue(), 10) + + case *commonv1.AnyValue_KvlistValue, *commonv1.AnyValue_ArrayValue: + jsonStr, _ := json.Marshal(anyValueToRaw(a)) + v = string(jsonStr) + } + + return v +} + +func anyValueToRaw(a *commonv1.AnyValue) interface{} { + var v interface{} + if a == nil { + return nil + } + switch a.GetValue().(type) { + case *commonv1.AnyValue_StringValue: + v = a.GetStringValue() + + case *commonv1.AnyValue_BoolValue: + v = a.GetBoolValue() + + case *commonv1.AnyValue_DoubleValue: + v = a.GetDoubleValue() + + case *commonv1.AnyValue_IntValue: + v = a.GetIntValue() + + case *commonv1.AnyValue_KvlistValue: + kvl := a.GetKvlistValue() + tv := make(map[string]interface{}, len(kvl.Values)) + for _, kv := range kvl.Values { + tv[kv.Key] = anyValueToRaw(kv.Value) + } + v = tv + + case *commonv1.AnyValue_ArrayValue: + av := a.GetArrayValue() + tv := make([]interface{}, len(av.Values)) + for i := range av.Values { + tv[i] = anyValueToRaw(av.Values[i]) + } + v = tv + } + return v +} + +// Is equivalent to strconv.FormatFloat(f, 'g', -1, 64), but hardcodes a few common cases for increased efficiency. +func float64ToDimValue(f float64) string { + // Parameters below are the same used by Prometheus + // see https://github.com/prometheus/common/blob/b5fe7d854c42dc7842e48d1ca58f60feae09d77b/expfmt/text_create.go#L450 + // SignalFx agent uses a different pattern + // https://github.com/signalfx/signalfx-agent/blob/5779a3de0c9861fa07316fd11b3c4ff38c0d78f0/internal/monitors/prometheusexporter/conversion.go#L77 + // The important issue here is consistency with the exporter, opting for the + // more common one used by Prometheus. + switch { + case f == 0: + return "0" + case f == 1: + return "1" + case math.IsInf(f, +1): + return "+Inf" + default: + return strconv.FormatFloat(f, 'g', -1, 64) + } +} + +func numberToSignalFxValue(in *metricsv1.NumberDataPoint) datapoint.Value { + v := in.GetValue() + switch n := v.(type) { + case *metricsv1.NumberDataPoint_AsDouble: + return datapoint.NewFloatValue(n.AsDouble) + case *metricsv1.NumberDataPoint_AsInt: + return datapoint.NewIntValue(n.AsInt) + } + return nil +} + +func mergeStringMaps(ms ...map[string]string) map[string]string { + out := make(map[string]string) + for _, m := range ms { + for k, v := range m { + out[k] = v + } + } + return out +} diff --git a/protocol/otlp/metrics_test.go b/protocol/otlp/metrics_test.go new file mode 100644 index 0000000..8301490 --- /dev/null +++ b/protocol/otlp/metrics_test.go @@ -0,0 +1,791 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "math" + "testing" + "time" + + "github.com/signalfx/golib/v3/datapoint" + . "github.com/smartystreets/goconvey/convey" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" + resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1" +) + +const ( + unixSecs = int64(1574092046) + unixNSecs = int64(11 * time.Millisecond) + tsMSecs = unixSecs*1e3 + unixNSecs/1e6 +) + +var ts = time.Unix(unixSecs, unixNSecs) + +func Test_FromMetrics(t *testing.T) { + labelMap := map[string]string{ + "k0": "v0", + "k1": "v1", + } + + const doubleVal = 1234.5678 + makeDoublePt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: &metricsv1.NumberDataPoint_AsDouble{AsDouble: doubleVal}, + } + } + + makeDoublePtWithLabels := func() *metricsv1.NumberDataPoint { + pt := makeDoublePt() + pt.Attributes = stringMapToAttributeMap(labelMap) + return pt + } + + const int64Val = int64(123) + makeInt64Pt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: &metricsv1.NumberDataPoint_AsInt{AsInt: int64Val}, + } + } + + makeInt64PtWithLabels := func() *metricsv1.NumberDataPoint { + pt := makeInt64Pt() + pt.Attributes = stringMapToAttributeMap(labelMap) + return pt + } + + makeNilValuePt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: nil, + } + } + + histBounds := []float64{1, 2, 4} + histCounts := []uint64{4, 2, 3, 7} + + makeDoubleHistDP := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100.0, + ExplicitBounds: histBounds, + BucketCounts: histCounts, + Attributes: stringMapToAttributeMap(labelMap), + } + } + doubleHistDP := makeDoubleHistDP() + + makeDoubleHistDPBadCounts := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100.0, + ExplicitBounds: histBounds, + BucketCounts: []uint64{4}, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + makeIntHistDP := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100, + ExplicitBounds: histBounds, + BucketCounts: histCounts, + Attributes: stringMapToAttributeMap(labelMap), + } + } + intHistDP := makeIntHistDP() + + makeIntHistDPBadCounts := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100, + ExplicitBounds: histBounds, + BucketCounts: []uint64{4}, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + makeHistDPNoBuckets := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + Count: 2, + Sum: 10, + TimeUnixNano: uint64(ts.UnixNano()), + Attributes: stringMapToAttributeMap(labelMap), + } + } + histDPNoBuckets := makeHistDPNoBuckets() + + const summarySumVal = 123.4 + const summaryCountVal = 111 + + makeSummaryDP := func() *metricsv1.SummaryDataPoint { + summaryDP := &metricsv1.SummaryDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Sum: summarySumVal, + Count: summaryCountVal, + Attributes: stringMapToAttributeMap(labelMap), + } + for i := 0; i < 4; i++ { + summaryDP.QuantileValues = append(summaryDP.QuantileValues, &metricsv1.SummaryDataPoint_ValueAtQuantile{ + Quantile: 0.25 * float64(i+1), + Value: float64(i), + }) + } + return summaryDP + } + + makeEmptySummaryDP := func() *metricsv1.SummaryDataPoint { + return &metricsv1.SummaryDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Sum: summarySumVal, + Count: summaryCountVal, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + tests := []struct { + name string + metricsFn func() []*metricsv1.ResourceMetrics + wantSfxDataPoints []*datapoint.Datapoint + }{ + { + name: "nil_node_nil_resources_no_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_no_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "gauge_int_with_no_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "cumulative_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "cumulative_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "delta_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "delta_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "gauge_sum_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "gauge_sum_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "gauge_sum_int_with_nil_value", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeNilValuePt(), + }, + }, + }, + }, + { + Name: "nil_data", + Data: nil, + }, + } + + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint("gauge_double_with_no_dims", datapoint.Gauge, nil, doubleVal), + int64SFxDataPoint("gauge_int_with_no_dims", datapoint.Gauge, nil, int64Val), + doubleSFxDataPoint("cumulative_double_with_no_dims", datapoint.Counter, nil, doubleVal), + int64SFxDataPoint("cumulative_int_with_no_dims", datapoint.Counter, nil, int64Val), + doubleSFxDataPoint("delta_double_with_no_dims", datapoint.Count, nil, doubleVal), + int64SFxDataPoint("delta_int_with_no_dims", datapoint.Count, nil, int64Val), + doubleSFxDataPoint("gauge_sum_double_with_no_dims", datapoint.Gauge, nil, doubleVal), + int64SFxDataPoint("gauge_sum_int_with_no_dims", datapoint.Gauge, nil, int64Val), + &datapoint.Datapoint{ + Metric: "gauge_sum_int_with_nil_value", + Timestamp: ts, + Value: nil, + MetricType: datapoint.Gauge, + Dimensions: map[string]string{}, + }, + }, + }, + { + name: "nil_node_and_resources_with_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "gauge_int_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + { + Name: "cumulative_double_with_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "cumulative_int_with_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + } + + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint("gauge_double_with_dims", datapoint.Gauge, labelMap, doubleVal), + int64SFxDataPoint("gauge_int_with_dims", datapoint.Gauge, labelMap, int64Val), + doubleSFxDataPoint("cumulative_double_with_dims", datapoint.Counter, labelMap, doubleVal), + int64SFxDataPoint("cumulative_int_with_dims", datapoint.Counter, labelMap, int64Val), + }, + }, + { + name: "with_node_resources_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{ + Resource: &resourcev1.Resource{ + Attributes: []*commonv1.KeyValue{ + { + Key: "k_r0", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_r0"}}, + }, + { + Key: "k_r1", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_r1"}}, + }, + { + Key: "k_n0", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_n0"}}, + }, + { + Key: "k_n1", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_n1"}}, + }, + }, + }, + } + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "gauge_int_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint( + "gauge_double_with_dims", + datapoint.Gauge, + mergeStringMaps(map[string]string{ + "k_n0": "v_n0", + "k_n1": "v_n1", + "k_r0": "v_r0", + "k_r1": "v_r1", + }, labelMap), + doubleVal), + int64SFxDataPoint( + "gauge_int_with_dims", + datapoint.Gauge, + mergeStringMaps(map[string]string{ + "k_n0": "v_n0", + "k_n1": "v_n1", + "k_r0": "v_r0", + "k_r1": "v_r1", + }, labelMap), + int64Val), + }, + }, + { + name: "histograms", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "int_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDP(), + }, + }, + }, + }, + { + Name: "int_delta_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDP(), + }, + }, + }, + }, + { + Name: "double_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDP(), + }, + }, + }, + }, + { + Name: "double_delta_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDP(), + }, + }, + }, + }, + { + Name: "double_histo_bad_counts", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDPBadCounts(), + }, + }, + }, + }, + { + Name: "int_histo_bad_counts", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDPBadCounts(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: mergeDPs( + expectedFromHistogram("int_histo", labelMap, *intHistDP, false), + expectedFromHistogram("int_delta_histo", labelMap, *intHistDP, true), + expectedFromHistogram("double_histo", labelMap, *doubleHistDP, false), + expectedFromHistogram("double_delta_histo", labelMap, *doubleHistDP, true), + []*datapoint.Datapoint{ + int64SFxDataPoint("double_histo_bad_counts_count", datapoint.Counter, labelMap, int64(doubleHistDP.Count)), + doubleSFxDataPoint("double_histo_bad_counts", datapoint.Counter, labelMap, doubleHistDP.Sum), + }, + []*datapoint.Datapoint{ + int64SFxDataPoint("int_histo_bad_counts_count", datapoint.Counter, labelMap, int64(intHistDP.Count)), + doubleSFxDataPoint("int_histo_bad_counts", datapoint.Counter, labelMap, intHistDP.Sum), + }, + ), + }, + { + name: "distribution_no_buckets", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "no_bucket_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeHistDPNoBuckets(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromHistogram("no_bucket_histo", labelMap, *histDPNoBuckets, false), + }, + { + name: "summaries", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "summary", + Data: &metricsv1.Metric_Summary{ + Summary: &metricsv1.Summary{ + DataPoints: []*metricsv1.SummaryDataPoint{ + makeSummaryDP(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromSummary("summary", labelMap, summaryCountVal, summarySumVal), + }, + { + name: "empty_summary", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "empty_summary", + Data: &metricsv1.Metric_Summary{ + Summary: &metricsv1.Summary{ + DataPoints: []*metricsv1.SummaryDataPoint{ + makeEmptySummaryDP(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromEmptySummary("empty_summary", labelMap, summaryCountVal, summarySumVal), + }, + } + for _, tt := range tests { + Convey(tt.name, t, func() { + rms := tt.metricsFn() + gotSfxDataPoints := FromOTLPMetricRequest(&metricsservicev1.ExportMetricsServiceRequest{ResourceMetrics: rms}) + So(tt.wantSfxDataPoints, ShouldResemble, gotSfxDataPoints) + }) + } +} + +func TestAttributesToDimensions(t *testing.T) { + Convey("attributesToDimensions", t, func() { + attrs := []*commonv1.KeyValue{ + { + Key: "a", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "s"}}, + }, + { + Key: "b", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: ""}}, + }, + { + Key: "c", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: true}}, + }, + { + Key: "d", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_IntValue{IntValue: 44}}, + }, + { + Key: "e", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 45.1}}, + }, + { + Key: "f", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_ArrayValue{ArrayValue: &commonv1.ArrayValue{ + Values: []*commonv1.AnyValue{ + {Value: &commonv1.AnyValue_StringValue{StringValue: "n1"}}, + {Value: &commonv1.AnyValue_StringValue{StringValue: "n2"}}, + }}}}, + }, + { + Key: "g", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_KvlistValue{KvlistValue: &commonv1.KeyValueList{ + Values: []*commonv1.KeyValue{ + {Key: "k1", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "n1"}}}, + {Key: "k2", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: false}}}, + {Key: "k3", Value: nil}, + {Key: "k4", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 40.3}}}, + {Key: "k5", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_IntValue{IntValue: 41}}}, + }}}}, + }, + { + Key: "h", + Value: nil, + }, + { + Key: "i", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 0}}, + }, + } + + dims := attributesToDimensions(attrs) + So(dims, ShouldResemble, map[string]string{ + "a": "s", + "c": "true", + "d": "44", + "e": "45.1", + "f": `["n1","n2"]`, + "g": `{"k1":"n1","k2":false,"k3":null,"k4":40.3,"k5":41}`, + "i": "0", + }) + }) +} + +func doubleSFxDataPoint( + metric string, + metricType datapoint.MetricType, + dims map[string]string, + val float64, +) *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: metric, + Timestamp: ts, + Value: datapoint.NewFloatValue(val), + MetricType: metricType, + Dimensions: cloneStringMap(dims), + } +} + +func int64SFxDataPoint( + metric string, + metricType datapoint.MetricType, + dims map[string]string, + val int64, +) *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: metric, + Timestamp: ts, + Value: datapoint.NewIntValue(val), + MetricType: metricType, + Dimensions: cloneStringMap(dims), + } +} + +func expectedFromHistogram( + metricName string, + dims map[string]string, + histDP metricsv1.HistogramDataPoint, + isDelta bool, +) []*datapoint.Datapoint { + buckets := histDP.GetBucketCounts() + + dps := make([]*datapoint.Datapoint, 0) + + typ := datapoint.Counter + if isDelta { + typ = datapoint.Count + } + + dps = append(dps, + int64SFxDataPoint(metricName+"_count", typ, dims, int64(histDP.GetCount())), + doubleSFxDataPoint(metricName, typ, dims, histDP.GetSum())) + + explicitBounds := histDP.GetExplicitBounds() + if explicitBounds == nil { + return dps + } + for i := 0; i < len(explicitBounds); i++ { + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(explicitBounds[i]) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, int64(buckets[i]))) + } + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(math.Inf(1)) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, int64(buckets[len(buckets)-1]))) + return dps +} + +func expectedFromSummary(name string, labelMap map[string]string, count int64, sumVal float64) []*datapoint.Datapoint { + countName := name + "_count" + countPt := int64SFxDataPoint(countName, datapoint.Counter, labelMap, count) + sumPt := doubleSFxDataPoint(name, datapoint.Counter, labelMap, sumVal) + out := []*datapoint.Datapoint{countPt, sumPt} + quantileDimVals := []string{"0.25", "0.5", "0.75", "1"} + for i := 0; i < 4; i++ { + qDims := map[string]string{"quantile": quantileDimVals[i]} + qPt := doubleSFxDataPoint( + name+"_quantile", + datapoint.Gauge, + mergeStringMaps(labelMap, qDims), + float64(i), + ) + out = append(out, qPt) + } + return out +} + +func expectedFromEmptySummary(name string, labelMap map[string]string, count int64, sumVal float64) []*datapoint.Datapoint { + countName := name + "_count" + countPt := int64SFxDataPoint(countName, datapoint.Counter, labelMap, count) + sumPt := doubleSFxDataPoint(name, datapoint.Counter, labelMap, sumVal) + return []*datapoint.Datapoint{countPt, sumPt} +} + +func mergeDPs(dps ...[]*datapoint.Datapoint) []*datapoint.Datapoint { + var out []*datapoint.Datapoint + for i := range dps { + out = append(out, dps[i]...) + } + return out +} + +func cloneStringMap(m map[string]string) map[string]string { + out := make(map[string]string) + for k, v := range m { + out[k] = v + } + return out +} + +func stringMapToAttributeMap(m map[string]string) []*commonv1.KeyValue { + ret := make([]*commonv1.KeyValue, 0, len(m)) + for k, v := range m { + ret = append(ret, &commonv1.KeyValue{ + Key: k, + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: v}}, + }) + } + return ret +} diff --git a/protocol/signalfx/datapoint.go b/protocol/signalfx/datapoint.go index 1406357..6ecc5f6 100644 --- a/protocol/signalfx/datapoint.go +++ b/protocol/signalfx/datapoint.go @@ -25,6 +25,7 @@ import ( "github.com/signalfx/golib/v3/sfxclient" "github.com/signalfx/golib/v3/web" "github.com/signalfx/ingest-protocols/logkey" + "github.com/signalfx/ingest-protocols/protocol" signalfxformat "github.com/signalfx/ingest-protocols/protocol/signalfx/format" ) @@ -138,7 +139,7 @@ func (decoder *ProtobufDecoderV2) Read(ctx context.Context, req *http.Request) ( jeff := buffs.Get().(*bytes.Buffer) defer buffs.Put(jeff) jeff.Reset() - if err = readFromRequest(jeff, req, decoder.Logger); err != nil { + if err = protocol.ReadFromRequest(jeff, req, decoder.Logger); err != nil { return err } var msg sfxmodel.DataPointUploadMessage diff --git a/protocol/signalfx/event.go b/protocol/signalfx/event.go index 6a7e4d3..228df61 100644 --- a/protocol/signalfx/event.go +++ b/protocol/signalfx/event.go @@ -15,6 +15,7 @@ import ( "github.com/signalfx/golib/v3/pointer" "github.com/signalfx/golib/v3/sfxclient" "github.com/signalfx/golib/v3/web" + "github.com/signalfx/ingest-protocols/protocol" signalfxformat "github.com/signalfx/ingest-protocols/protocol/signalfx/format" ) @@ -28,7 +29,7 @@ func (decoder *ProtobufEventDecoderV2) Read(ctx context.Context, req *http.Reque jeff := buffs.Get().(*bytes.Buffer) defer buffs.Put(jeff) jeff.Reset() - if err = readFromRequest(jeff, req, decoder.Logger); err != nil { + if err = protocol.ReadFromRequest(jeff, req, decoder.Logger); err != nil { return err } var msg sfxmodel.EventUploadMessage diff --git a/protocol/signalfx/helper.go b/protocol/util.go similarity index 70% rename from protocol/signalfx/helper.go rename to protocol/util.go index f19aa85..29067ef 100644 --- a/protocol/signalfx/helper.go +++ b/protocol/util.go @@ -1,4 +1,4 @@ -package signalfx +package protocol import ( "bytes" @@ -8,7 +8,8 @@ import ( "github.com/signalfx/ingest-protocols/logkey" ) -func readFromRequest(jeff *bytes.Buffer, req *http.Request, logger log.Logger) error { +// ReadFromRequest reads all the http body into the jeff buffer and logs if there is an error. +func ReadFromRequest(jeff *bytes.Buffer, req *http.Request, logger log.Logger) error { // for compressed transactions, contentLength isn't trustworthy readLen, err := jeff.ReadFrom(req.Body) if err != nil { diff --git a/protocol/util_test.go b/protocol/util_test.go new file mode 100644 index 0000000..f971c69 --- /dev/null +++ b/protocol/util_test.go @@ -0,0 +1,51 @@ +package protocol + +import ( + "bytes" + "errors" + "io" + "net/http" + "testing" + + "github.com/signalfx/golib/v3/log" + . "github.com/smartystreets/goconvey/convey" +) + +var errReadErr = errors.New("could not read") + +type testReader struct { + content []byte + err error +} + +func (tr *testReader) Read(b []byte) (int, error) { + if tr.err != nil { + return 0, tr.err + } + n := copy(b, tr.content) + return n, io.EOF +} + +func TestReadFromRequest(t *testing.T) { + Convey("ReadFromRequest", t, func() { + reader := &testReader{} + var req http.Request + out := bytes.NewBuffer([]byte{}) + + Convey("good read", func() { + reader.content = []byte{0x05} + req.Body = io.NopCloser(reader) + req.ContentLength = 1 + err := ReadFromRequest(out, &req, log.Discard) + So(err, ShouldBeNil) + So(out.Bytes(), ShouldResemble, []byte{0x05}) + }) + + Convey("bad read", func() { + reader.err = errReadErr + req.Body = io.NopCloser(reader) + err := ReadFromRequest(out, &req, log.Discard) + So(err, ShouldEqual, errReadErr) + }) + }) +}