diff --git a/go.mod b/go.mod index 872d382..4c889b2 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/smartystreets/assertions v1.0.1 github.com/smartystreets/goconvey v1.6.4 github.com/stretchr/testify v1.7.0 + go.opentelemetry.io/proto/otlp v0.7.0 google.golang.org/grpc v1.40.0 ) diff --git a/go.sum b/go.sum index cf20b2b..f02a64e 100644 --- a/go.sum +++ b/go.sum @@ -1119,6 +1119,7 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/collector v0.28.0/go.mod h1:AP/BTXwo1eedoJO7V+HQ68CSvJU1lcdqOzJCgt1VsNs= +go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= diff --git a/protocol/otlp/decoder.go b/protocol/otlp/decoder.go new file mode 100644 index 0000000..7a4dc20 --- /dev/null +++ b/protocol/otlp/decoder.go @@ -0,0 +1,51 @@ +package otlp + +import ( + "bytes" + "context" + "net/http" + "sync" + + "github.com/signalfx/golib/v3/datapoint/dpsink" + "github.com/signalfx/golib/v3/log" + "github.com/signalfx/ingest-protocols/protocol" + "github.com/signalfx/ingest-protocols/protocol/signalfx" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + "google.golang.org/protobuf/proto" +) + +type httpMetricDecoder struct { + sink dpsink.Sink + logger log.Logger + buffs sync.Pool +} + +func NewHTTPMetricDecoder(sink dpsink.Sink, logger log.Logger) signalfx.ErrorReader { + return &httpMetricDecoder{ + sink: sink, + logger: logger, + buffs: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, + } +} + +func (d *httpMetricDecoder) Read(ctx context.Context, req *http.Request) (err error) { + jeff := d.buffs.Get().(*bytes.Buffer) + defer d.buffs.Put(jeff) + jeff.Reset() + if err = protocol.ReadFromRequest(jeff, req, d.logger); err != nil { + return err + } + var msg metricsservicev1.ExportMetricsServiceRequest + if err = proto.Unmarshal(jeff.Bytes(), &msg); err != nil { + return err + } + dps := FromOTLPMetricRequest(&msg) + if len(dps) > 0 { + err = d.sink.AddDatapoints(ctx, dps) + } + return nil +} diff --git a/protocol/otlp/decoder_test.go b/protocol/otlp/decoder_test.go new file mode 100644 index 0000000..1149b86 --- /dev/null +++ b/protocol/otlp/decoder_test.go @@ -0,0 +1,99 @@ +package otlp + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "sync" + "testing" + + "github.com/signalfx/golib/v3/datapoint/dptest" + "github.com/signalfx/golib/v3/log" + . "github.com/smartystreets/goconvey/convey" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" + "google.golang.org/protobuf/proto" +) + +var errReadErr = errors.New("could not read") + +type errorReader struct{} + +func (errorReader *errorReader) Read([]byte) (int, error) { + return 0, errReadErr +} + +func TestDecoder(t *testing.T) { + Convey("httpMetricDecoder", t, func() { + sendTo := dptest.NewBasicSink() + decoder := NewHTTPMetricDecoder(sendTo, log.Discard) + + Convey("Bad request reading", func() { + req := &http.Request{ + Body: io.NopCloser(&errorReader{}), + } + req.ContentLength = 1 + ctx := context.Background() + So(decoder.Read(ctx, req), ShouldEqual, errReadErr) + }) + + Convey("Bad request content", func() { + req := &http.Request{ + Body: io.NopCloser(bytes.NewBufferString("asdf")), + } + req.ContentLength = 4 + ctx := context.Background() + So(decoder.Read(ctx, req), ShouldNotBeNil) + }) + + Convey("Good request", func(c C) { + var msg metricsservicev1.ExportMetricsServiceRequest + msg.ResourceMetrics = []*metricsv1.ResourceMetrics{ + { + InstrumentationLibraryMetrics: []*metricsv1.InstrumentationLibraryMetrics{ + { + Metrics: []*metricsv1.Metric{ + { + Name: "test", + Data: &metricsv1.Metric_IntGauge{ + IntGauge: &metricsv1.IntGauge{ + DataPoints: []*metricsv1.IntDataPoint{ + { + Labels: []*commonv1.StringKeyValue{}, + StartTimeUnixNano: 1000, + TimeUnixNano: 1000, + Value: 4, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + b, _ := proto.Marshal(&msg) + req := &http.Request{ + Body: io.NopCloser(bytes.NewBuffer(b)), + } + req.ContentLength = int64(len(b)) + ctx := context.Background() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + dp := <-sendTo.PointsChan + c.So(dp, ShouldNotBeNil) + wg.Done() + }() + + So(decoder.Read(ctx, req), ShouldBeNil) + + wg.Wait() + }) + }) +} diff --git a/protocol/otlp/metrics.go b/protocol/otlp/metrics.go new file mode 100644 index 0000000..af8ed7c --- /dev/null +++ b/protocol/otlp/metrics.go @@ -0,0 +1,440 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This code is copied and modified directly from the OTEL Collector: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/1d6309bb62264cc7e2dda076ed95385b1ddef28a/pkg/translator/signalfx/from_metrics.go + +package otlp + +import ( + "encoding/json" + "math" + "strconv" + "time" + + "github.com/signalfx/golib/v3/datapoint" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" +) + +var ( + // Some standard dimension keys. + // upper bound dimension key for histogram buckets. + upperBoundDimensionKey = "upper_bound" + + // infinity bound dimension value is used on all histograms. + infinityBoundSFxDimValue = float64ToDimValue(math.Inf(1)) +) + +// FromOTLPMetricRequest converts the ResourceMetrics in an incoming request to SignalFx datapoints +func FromOTLPMetricRequest(md *metricsservicev1.ExportMetricsServiceRequest) ([]*datapoint.Datapoint) { + return FromOTLPResourceMetrics(md.GetResourceMetrics()) +} + +// FromMetrics converts OTLP ResourceMetrics to SignalFx datapoints. +func FromOTLPResourceMetrics(rms []*metricsv1.ResourceMetrics) ([]*datapoint.Datapoint) { + var sfxDps []*datapoint.Datapoint + + for _, rm := range rms { + for _, ilm := range rm.GetInstrumentationLibraryMetrics() { + for _, m := range ilm.GetMetrics() { + sfxDps = append(sfxDps, FromMetric(m)...) + } + } + + extraDimensions := attributesToDimensions(rm.GetResource().GetAttributes()) + for i := range sfxDps { + dpDims := sfxDps[i].Dimensions + for k, v := range extraDimensions { + if _, ok := dpDims[k]; !ok { + dpDims[k] = v + } + } + } + } + + return sfxDps +} + +// FromMetric converts a OTLP Metric to SignalFx datapoint(s). +func FromMetric(m *metricsv1.Metric) []*datapoint.Datapoint { + var dps []*datapoint.Datapoint + + basePoint := &datapoint.Datapoint{ + Metric: m.GetName(), + MetricType: fromMetricTypeToMetricType(m), + } + + data := m.GetData() + switch data.(type) { + case *metricsv1.Metric_IntGauge: + dps = convertIntDataPoints(m.GetIntGauge().GetDataPoints(), basePoint) + case *metricsv1.Metric_DoubleGauge: + dps = convertDoubleDataPoints(m.GetDoubleGauge().GetDataPoints(), basePoint) + + case *metricsv1.Metric_IntSum: + dps = convertIntDataPoints(m.GetIntSum().GetDataPoints(), basePoint) + case *metricsv1.Metric_DoubleSum: + dps = convertDoubleDataPoints(m.GetDoubleSum().GetDataPoints(), basePoint) + case *metricsv1.Metric_IntHistogram: + dps = convertIntHistogram(m.GetIntHistogram().GetDataPoints(), basePoint) + case *metricsv1.Metric_DoubleHistogram: + dps = convertDoubleHistogram(m.GetDoubleHistogram().GetDataPoints(), basePoint) + case *metricsv1.Metric_DoubleSummary: + dps = convertSummaryDataPoints(m.GetDoubleSummary().GetDataPoints(), m.GetName()) + } + + return dps +} + +func fromMetricTypeToMetricType(m *metricsv1.Metric) datapoint.MetricType { + data := m.GetData() + switch data.(type) { + case *metricsv1.Metric_IntGauge: + case *metricsv1.Metric_DoubleGauge: + return datapoint.Gauge + + case *metricsv1.Metric_IntSum: + if !m.GetIntSum().GetIsMonotonic() { + return datapoint.Gauge + } + if m.GetIntSum().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + + case *metricsv1.Metric_IntHistogram: + if m.GetIntHistogram().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + case *metricsv1.Metric_DoubleSum: + if !m.GetDoubleSum().GetIsMonotonic() { + return datapoint.Gauge + } + if m.GetDoubleSum().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + + case *metricsv1.Metric_DoubleHistogram: + if m.GetDoubleHistogram().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + case *metricsv1.Metric_DoubleSummary: + return datapoint.Counter + } + + return datapoint.Gauge +} + +func convertDoubleDataPoints(in []*metricsv1.DoubleDataPoint, basePoint *datapoint.Datapoint) []*datapoint.Datapoint { + out := make([]*datapoint.Datapoint, 0, len(in)) + + for _, inDp := range in { + dp := *basePoint + dp.Timestamp = time.Unix(0, int64(inDp.GetTimeUnixNano())) + dp.Dimensions = labelsToDimensions(inDp.GetLabels()) + + dp.Value = datapoint.NewFloatValue(inDp.GetValue()) + + out = append(out, &dp) + } + return out +} + +func convertIntDataPoints(in []*metricsv1.IntDataPoint, basePoint *datapoint.Datapoint) []*datapoint.Datapoint { + out := make([]*datapoint.Datapoint, 0, len(in)) + + for _, inDp := range in { + dp := *basePoint + dp.Timestamp = time.Unix(0, int64(inDp.GetTimeUnixNano())) + dp.Dimensions = labelsToDimensions(inDp.GetLabels()) + + dp.Value = datapoint.NewIntValue(inDp.GetValue()) + + out = append(out, &dp) + } + return out +} + +func convertIntHistogram(histDPs []*metricsv1.IntHistogramDataPoint, basePoint *datapoint.Datapoint) []*datapoint.Datapoint { + var out []*datapoint.Datapoint + + for _, histDP := range histDPs { + ts := time.Unix(0, int64(histDP.GetTimeUnixNano())) + + countDP := *basePoint + countDP.Metric = basePoint.Metric + "_count" + countDP.Timestamp = ts + countDP.Dimensions = labelsToDimensions(histDP.GetLabels()) + count := int64(histDP.GetCount()) + countDP.Value = datapoint.NewIntValue(count) + + sumDP := *basePoint + sumDP.Timestamp = ts + sumDP.Dimensions = labelsToDimensions(histDP.GetLabels()) + sum := histDP.GetSum() + sumDP.Value = datapoint.NewIntValue(sum) + + out = append(out, &countDP, &sumDP) + + bounds := histDP.GetExplicitBounds() + counts := histDP.GetBucketCounts() + + // Spec says counts is optional but if present it must have one more + // element than the bounds array. + if len(counts) > 0 && len(counts) != len(bounds)+1 { + continue + } + + for j, c := range counts { + bound := infinityBoundSFxDimValue + if j < len(bounds) { + bound = float64ToDimValue(bounds[j]) + } + + dp := *basePoint + dp.Metric = basePoint.Metric + "_bucket" + dp.Timestamp = ts + dp.Dimensions = labelsToDimensions(histDP.GetLabels()) + dp.Dimensions[upperBoundDimensionKey] = bound + dp.Value = datapoint.NewIntValue(int64(c)) + + out = append(out, &dp) + } + } + + return out +} + +func convertDoubleHistogram(histDPs []*metricsv1.DoubleHistogramDataPoint, basePoint *datapoint.Datapoint) []*datapoint.Datapoint { + var out []*datapoint.Datapoint + + for _, histDP := range histDPs { + ts := time.Unix(0, int64(histDP.GetTimeUnixNano())) + + countDP := *basePoint + countDP.Metric = basePoint.Metric + "_count" + countDP.Timestamp = ts + countDP.Dimensions = labelsToDimensions(histDP.GetLabels()) + count := int64(histDP.GetCount()) + countDP.Value = datapoint.NewIntValue(count) + + sumDP := *basePoint + sumDP.Timestamp = ts + sumDP.Dimensions = labelsToDimensions(histDP.GetLabels()) + sum := histDP.GetSum() + sumDP.Value = datapoint.NewFloatValue(sum) + + out = append(out, &countDP, &sumDP) + + bounds := histDP.GetExplicitBounds() + counts := histDP.GetBucketCounts() + + // Spec says counts is optional but if present it must have one more + // element than the bounds array. + if len(counts) > 0 && len(counts) != len(bounds)+1 { + continue + } + + for j, c := range counts { + bound := infinityBoundSFxDimValue + if j < len(bounds) { + bound = float64ToDimValue(bounds[j]) + } + + dp := *basePoint + dp.Metric = basePoint.Metric + "_bucket" + dp.Timestamp = ts + dp.Dimensions = labelsToDimensions(histDP.GetLabels()) + dp.Dimensions[upperBoundDimensionKey] = bound + dp.Value = datapoint.NewIntValue(int64(c)) + + out = append(out, &dp) + } + } + + return out +} + +func convertSummaryDataPoints( + in []*metricsv1.DoubleSummaryDataPoint, + name string, +) []*datapoint.Datapoint { + out := make([]*datapoint.Datapoint, 0, len(in)) + + for _, inDp := range in { + dims := labelsToDimensions(inDp.GetLabels()) + ts := time.Unix(0, int64(inDp.GetTimeUnixNano())) + + countPt := datapoint.Datapoint{ + Metric: name + "_count", + Timestamp: ts, + Dimensions: dims, + MetricType: datapoint.Counter, + } + c := int64(inDp.GetCount()) + countPt.Value = datapoint.NewIntValue(c) + out = append(out, &countPt) + + sumPt := datapoint.Datapoint{ + Metric: name, + Timestamp: ts, + Dimensions: dims, + MetricType: datapoint.Counter, + } + sum := inDp.GetSum() + sumPt.Value = datapoint.NewFloatValue(sum) + out = append(out, &sumPt) + + qvs := inDp.GetQuantileValues() + for _, qv := range qvs { + qPt := datapoint.Datapoint{ + Metric: name + "_quantile", + Timestamp: ts, + Dimensions: mergeStringMaps(dims, map[string]string{ + "quantile": strconv.FormatFloat(qv.GetQuantile(), 'f', -1, 64), + }), + MetricType: datapoint.Gauge, + } + qPt.Value = datapoint.NewFloatValue(qv.GetValue()) + out = append(out, &qPt) + } + } + return out +} + +func attributesToDimensions(attributes []*commonv1.KeyValue) map[string]string { + dimensions := make(map[string]string, len(attributes)) + if len(attributes) == 0 { + return dimensions + } + for _, kv := range attributes { + v := stringifyAnyValue(kv.GetValue()) + if v == "" { + // Don't bother setting things that serialize to nothing + continue + } + + dimensions[kv.Key] = v + } + return dimensions +} + +func stringifyAnyValue(a *commonv1.AnyValue) string { + var v string + if a == nil { + return "" + } + switch a.GetValue().(type) { + case *commonv1.AnyValue_StringValue: + v = a.GetStringValue() + + case *commonv1.AnyValue_BoolValue: + v = strconv.FormatBool(a.GetBoolValue()) + + case *commonv1.AnyValue_DoubleValue: + v = float64ToDimValue(a.GetDoubleValue()) + + case *commonv1.AnyValue_IntValue: + v = strconv.FormatInt(a.GetIntValue(), 10) + + case *commonv1.AnyValue_KvlistValue, *commonv1.AnyValue_ArrayValue: + jsonStr, _ := json.Marshal(anyValueToRaw(a)) + v = string(jsonStr) + } + + return v +} + +func anyValueToRaw(a *commonv1.AnyValue) interface{} { + var v interface{} + if a == nil { + return nil + } + switch a.GetValue().(type) { + case *commonv1.AnyValue_StringValue: + v = a.GetStringValue() + + case *commonv1.AnyValue_BoolValue: + v = a.GetBoolValue() + + case *commonv1.AnyValue_DoubleValue: + v = a.GetDoubleValue() + + case *commonv1.AnyValue_IntValue: + v = a.GetIntValue() + + case *commonv1.AnyValue_KvlistValue: + kvl := a.GetKvlistValue() + tv := make(map[string]interface{}, len(kvl.Values)) + for _, kv := range kvl.Values { + tv[kv.Key] = anyValueToRaw(kv.Value) + } + v = tv + + case *commonv1.AnyValue_ArrayValue: + av := a.GetArrayValue() + tv := make([]interface{}, len(av.Values)) + for i := range av.Values { + tv[i] = anyValueToRaw(av.Values[i]) + } + v = tv + } + return v +} + +func labelsToDimensions(attributes []*commonv1.StringKeyValue) map[string]string { + dimensions := make(map[string]string, len(attributes)) + if len(attributes) == 0 { + return dimensions + } + for _, kv := range attributes { + dimensions[kv.Key] = kv.Value + } + return dimensions +} + +// Is equivalent to strconv.FormatFloat(f, 'g', -1, 64), but hardcodes a few common cases for increased efficiency. +func float64ToDimValue(f float64) string { + // Parameters below are the same used by Prometheus + // see https://github.com/prometheus/common/blob/b5fe7d854c42dc7842e48d1ca58f60feae09d77b/expfmt/text_create.go#L450 + // SignalFx agent uses a different pattern + // https://github.com/signalfx/signalfx-agent/blob/5779a3de0c9861fa07316fd11b3c4ff38c0d78f0/internal/monitors/prometheusexporter/conversion.go#L77 + // The important issue here is consistency with the exporter, opting for the + // more common one used by Prometheus. + switch { + case f == 0: + return "0" + case f == 1: + return "1" + case math.IsInf(f, +1): + return "+Inf" + default: + return strconv.FormatFloat(f, 'g', -1, 64) + } +} + +func mergeStringMaps(ms ...map[string]string) map[string]string { + out := make(map[string]string) + for _, m := range ms { + for k, v := range m { + out[k] = v + } + } + return out +} diff --git a/protocol/otlp/metrics_test.go b/protocol/otlp/metrics_test.go new file mode 100644 index 0000000..33d14fd --- /dev/null +++ b/protocol/otlp/metrics_test.go @@ -0,0 +1,796 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "math" + "testing" + "time" + + "github.com/signalfx/golib/v3/datapoint" + . "github.com/smartystreets/goconvey/convey" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" + resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1" +) + +const ( + unixSecs = int64(1574092046) + unixNSecs = int64(11 * time.Millisecond) + tsMSecs = unixSecs*1e3 + unixNSecs/1e6 +) + +var ts = time.Unix(unixSecs, unixNSecs) + +func Test_FromMetrics(t *testing.T) { + labelMap := map[string]string{ + "k0": "v0", + "k1": "v1", + } + + const doubleVal = 1234.5678 + makeDoublePt := func() *metricsv1.DoubleDataPoint { + return &metricsv1.DoubleDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: doubleVal, + } + } + + makeDoublePtWithLabels := func() *metricsv1.DoubleDataPoint { + pt := makeDoublePt() + pt.Labels = stringMapToAttributeMap(labelMap) + return pt + } + + const int64Val = int64(123) + makeInt64Pt := func() *metricsv1.IntDataPoint { + return &metricsv1.IntDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: int64Val, + } + } + + makeInt64PtWithLabels := func() *metricsv1.IntDataPoint { + pt := makeInt64Pt() + pt.Labels = stringMapToAttributeMap(labelMap) + return pt + } + + histBounds := []float64{1, 2, 4} + histCounts := []uint64{4, 2, 3, 7} + + makeDoubleHistDP := func() *metricsv1.DoubleHistogramDataPoint { + return &metricsv1.DoubleHistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100.0, + ExplicitBounds: histBounds, + BucketCounts: histCounts, + Labels: stringMapToAttributeMap(labelMap), + } + } + doubleHistDP := makeDoubleHistDP() + + makeDoubleHistDPBadCounts := func() *metricsv1.DoubleHistogramDataPoint { + return &metricsv1.DoubleHistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100.0, + ExplicitBounds: histBounds, + BucketCounts: []uint64{4}, + Labels: stringMapToAttributeMap(labelMap), + } + } + + makeIntHistDP := func() *metricsv1.IntHistogramDataPoint { + return &metricsv1.IntHistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100, + ExplicitBounds: histBounds, + BucketCounts: histCounts, + Labels: stringMapToAttributeMap(labelMap), + } + } + intHistDP := makeIntHistDP() + + makeIntHistDPBadCounts := func() *metricsv1.IntHistogramDataPoint { + return &metricsv1.IntHistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: 100, + ExplicitBounds: histBounds, + BucketCounts: []uint64{4}, + Labels: stringMapToAttributeMap(labelMap), + } + } + + makeHistDPNoBuckets := func() *metricsv1.IntHistogramDataPoint { + return &metricsv1.IntHistogramDataPoint{ + Count: 2, + Sum: 10, + TimeUnixNano: uint64(ts.UnixNano()), + Labels: stringMapToAttributeMap(labelMap), + } + } + histDPNoBuckets := makeHistDPNoBuckets() + + const summarySumVal = 123.4 + const summaryCountVal = 111 + + makeSummaryDP := func() *metricsv1.DoubleSummaryDataPoint { + summaryDP := &metricsv1.DoubleSummaryDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Sum: summarySumVal, + Count: summaryCountVal, + Labels: stringMapToAttributeMap(labelMap), + } + for i := 0; i < 4; i++ { + summaryDP.QuantileValues = append(summaryDP.QuantileValues, &metricsv1.DoubleSummaryDataPoint_ValueAtQuantile{ + Quantile: 0.25 * float64(i+1), + Value: float64(i), + }) + } + return summaryDP + } + + makeEmptySummaryDP := func() *metricsv1.DoubleSummaryDataPoint { + return &metricsv1.DoubleSummaryDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Sum: summarySumVal, + Count: summaryCountVal, + Labels: stringMapToAttributeMap(labelMap), + } + } + + tests := []struct { + name string + metricsFn func() []*metricsv1.ResourceMetrics + wantSfxDataPoints []*datapoint.Datapoint + }{ + { + name: "nil_node_nil_resources_no_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_no_dims", + Data: &metricsv1.Metric_DoubleGauge{ + DoubleGauge: &metricsv1.DoubleGauge{ + DataPoints: []*metricsv1.DoubleDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "gauge_int_with_no_dims", + Data: &metricsv1.Metric_IntGauge{ + IntGauge: &metricsv1.IntGauge{ + DataPoints: []*metricsv1.IntDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "cumulative_double_with_no_dims", + Data: &metricsv1.Metric_DoubleSum{ + DoubleSum: &metricsv1.DoubleSum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.DoubleDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "cumulative_int_with_no_dims", + Data: &metricsv1.Metric_IntSum{ + IntSum: &metricsv1.IntSum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.IntDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "delta_double_with_no_dims", + Data: &metricsv1.Metric_DoubleSum{ + DoubleSum: &metricsv1.DoubleSum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.DoubleDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "delta_int_with_no_dims", + Data: &metricsv1.Metric_IntSum{ + IntSum: &metricsv1.IntSum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.IntDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "gauge_sum_double_with_no_dims", + Data: &metricsv1.Metric_DoubleSum{ + DoubleSum: &metricsv1.DoubleSum{ + IsMonotonic: false, + DataPoints: []*metricsv1.DoubleDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "gauge_sum_int_with_no_dims", + Data: &metricsv1.Metric_IntSum{ + IntSum: &metricsv1.IntSum{ + IsMonotonic: false, + DataPoints: []*metricsv1.IntDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + } + + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint("gauge_double_with_no_dims", datapoint.Gauge, nil, doubleVal), + int64SFxDataPoint("gauge_int_with_no_dims", datapoint.Gauge, nil, int64Val), + doubleSFxDataPoint("cumulative_double_with_no_dims", datapoint.Counter, nil, doubleVal), + int64SFxDataPoint("cumulative_int_with_no_dims", datapoint.Counter, nil, int64Val), + doubleSFxDataPoint("delta_double_with_no_dims", datapoint.Count, nil, doubleVal), + int64SFxDataPoint("delta_int_with_no_dims", datapoint.Count, nil, int64Val), + doubleSFxDataPoint("gauge_sum_double_with_no_dims", datapoint.Gauge, nil, doubleVal), + int64SFxDataPoint("gauge_sum_int_with_no_dims", datapoint.Gauge, nil, int64Val), + }, + }, + { + name: "nil_node_and_resources_with_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_dims", + Data: &metricsv1.Metric_DoubleGauge{ + DoubleGauge: &metricsv1.DoubleGauge{ + DataPoints: []*metricsv1.DoubleDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "gauge_int_with_dims", + Data: &metricsv1.Metric_IntGauge{ + IntGauge: &metricsv1.IntGauge{ + DataPoints: []*metricsv1.IntDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + { + Name: "cumulative_double_with_dims", + Data: &metricsv1.Metric_DoubleSum{ + DoubleSum: &metricsv1.DoubleSum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.DoubleDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "cumulative_int_with_dims", + Data: &metricsv1.Metric_IntSum{ + IntSum: &metricsv1.IntSum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.IntDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + } + + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint("gauge_double_with_dims", datapoint.Gauge, labelMap, doubleVal), + int64SFxDataPoint("gauge_int_with_dims", datapoint.Gauge, labelMap, int64Val), + doubleSFxDataPoint("cumulative_double_with_dims", datapoint.Counter, labelMap, doubleVal), + int64SFxDataPoint("cumulative_int_with_dims", datapoint.Counter, labelMap, int64Val), + }, + }, + { + name: "with_node_resources_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{ + Resource: &resourcev1.Resource{ + Attributes: []*commonv1.KeyValue{ + { + Key: "k_r0", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_r0"}}, + }, + { + Key: "k_r1", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_r1"}}, + }, + { + Key: "k_n0", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_n0"}}, + }, + { + Key: "k_n1", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_n1"}}, + }, + }, + }, + } + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_dims", + Data: &metricsv1.Metric_DoubleGauge{ + DoubleGauge: &metricsv1.DoubleGauge{ + DataPoints: []*metricsv1.DoubleDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "gauge_int_with_dims", + Data: &metricsv1.Metric_IntGauge{ + IntGauge: &metricsv1.IntGauge{ + DataPoints: []*metricsv1.IntDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint( + "gauge_double_with_dims", + datapoint.Gauge, + mergeStringMaps(map[string]string{ + "k_n0": "v_n0", + "k_n1": "v_n1", + "k_r0": "v_r0", + "k_r1": "v_r1", + }, labelMap), + doubleVal), + int64SFxDataPoint( + "gauge_int_with_dims", + datapoint.Gauge, + mergeStringMaps(map[string]string{ + "k_n0": "v_n0", + "k_n1": "v_n1", + "k_r0": "v_r0", + "k_r1": "v_r1", + }, labelMap), + int64Val), + }, + }, + { + name: "histograms", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "int_histo", + Data: &metricsv1.Metric_IntHistogram{ + IntHistogram: &metricsv1.IntHistogram{ + DataPoints: []*metricsv1.IntHistogramDataPoint{ + makeIntHistDP(), + }, + }, + }, + }, + { + Name: "int_delta_histo", + Data: &metricsv1.Metric_IntHistogram{ + IntHistogram: &metricsv1.IntHistogram{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.IntHistogramDataPoint{ + makeIntHistDP(), + }, + }, + }, + }, + { + Name: "double_histo", + Data: &metricsv1.Metric_DoubleHistogram{ + DoubleHistogram: &metricsv1.DoubleHistogram{ + DataPoints: []*metricsv1.DoubleHistogramDataPoint{ + makeDoubleHistDP(), + }, + }, + }, + }, + { + Name: "double_delta_histo", + Data: &metricsv1.Metric_DoubleHistogram{ + DoubleHistogram: &metricsv1.DoubleHistogram{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.DoubleHistogramDataPoint{ + makeDoubleHistDP(), + }, + }, + }, + }, + { + Name: "double_histo_bad_counts", + Data: &metricsv1.Metric_DoubleHistogram{ + DoubleHistogram: &metricsv1.DoubleHistogram{ + DataPoints: []*metricsv1.DoubleHistogramDataPoint{ + makeDoubleHistDPBadCounts(), + }, + }, + }, + }, + { + Name: "int_histo_bad_counts", + Data: &metricsv1.Metric_IntHistogram{ + IntHistogram: &metricsv1.IntHistogram{ + DataPoints: []*metricsv1.IntHistogramDataPoint{ + makeIntHistDPBadCounts(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: mergeDPs( + expectedFromIntHistogram("int_histo", labelMap, *intHistDP, false), + expectedFromIntHistogram("int_delta_histo", labelMap, *intHistDP, true), + expectedFromDoubleHistogram("double_histo", labelMap, *doubleHistDP, false), + expectedFromDoubleHistogram("double_delta_histo", labelMap, *doubleHistDP, true), + []*datapoint.Datapoint{ + int64SFxDataPoint("double_histo_bad_counts_count", datapoint.Counter, labelMap, int64(doubleHistDP.Count)), + doubleSFxDataPoint("double_histo_bad_counts", datapoint.Counter, labelMap, doubleHistDP.Sum), + }, + []*datapoint.Datapoint{ + int64SFxDataPoint("int_histo_bad_counts_count", datapoint.Counter, labelMap, int64(intHistDP.Count)), + int64SFxDataPoint("int_histo_bad_counts", datapoint.Counter, labelMap, intHistDP.Sum), + }, + ), + }, + { + name: "distribution_no_buckets", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "no_bucket_histo", + Data: &metricsv1.Metric_IntHistogram{ + IntHistogram: &metricsv1.IntHistogram{ + DataPoints: []*metricsv1.IntHistogramDataPoint{ + makeHistDPNoBuckets(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromIntHistogram("no_bucket_histo", labelMap, *histDPNoBuckets, false), + }, + { + name: "summaries", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "summary", + Data: &metricsv1.Metric_DoubleSummary{ + DoubleSummary: &metricsv1.DoubleSummary{ + DataPoints: []*metricsv1.DoubleSummaryDataPoint{ + makeSummaryDP(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromSummary("summary", labelMap, summaryCountVal, summarySumVal), + }, + { + name: "empty_summary", + metricsFn: func() []*metricsv1.ResourceMetrics { + out := &metricsv1.ResourceMetrics{} + ilm := &metricsv1.InstrumentationLibraryMetrics{} + out.InstrumentationLibraryMetrics = append(out.InstrumentationLibraryMetrics, ilm) + + ilm.Metrics = []*metricsv1.Metric{ + { + Name: "empty_summary", + Data: &metricsv1.Metric_DoubleSummary{ + DoubleSummary: &metricsv1.DoubleSummary{ + DataPoints: []*metricsv1.DoubleSummaryDataPoint{ + makeEmptySummaryDP(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromEmptySummary("empty_summary", labelMap, summaryCountVal, summarySumVal), + }, + } + for _, tt := range tests { + Convey(tt.name, t, func() { + rms := tt.metricsFn() + gotSfxDataPoints := FromOTLPMetricRequest(&metricsservicev1.ExportMetricsServiceRequest{ResourceMetrics: rms}) + So(tt.wantSfxDataPoints, ShouldResemble, gotSfxDataPoints) + }) + } +} + +func TestAttributesToDimensions(t *testing.T) { + Convey("attributesToDimensions", t, func() { + attrs := []*commonv1.KeyValue{ + { + Key: "a", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "s"}}, + }, + { + Key: "b", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: ""}}, + }, + { + Key: "c", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: true}}, + }, + { + Key: "d", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_IntValue{IntValue: 44}}, + }, + { + Key: "e", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 45.1}}, + }, + { + Key: "f", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_ArrayValue{ArrayValue: &commonv1.ArrayValue{ + Values: []*commonv1.AnyValue{ + {Value: &commonv1.AnyValue_StringValue{StringValue: "n1"}}, + {Value: &commonv1.AnyValue_StringValue{StringValue: "n2"}}, + }}}}, + }, + { + Key: "g", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_KvlistValue{KvlistValue: &commonv1.KeyValueList{ + Values: []*commonv1.KeyValue{ + {Key: "k1", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "n1"}}}, + {Key: "k2", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: false}}}, + {Key: "k3", Value: nil}, + {Key: "k4", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 40.3}}}, + {Key: "k5", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_IntValue{IntValue: 41}}}, + }}}}, + }, + { + Key: "h", + Value: nil, + }, + { + Key: "i", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 0}}, + }, + } + + dims := attributesToDimensions(attrs) + So(dims, ShouldResemble, map[string]string{ + "a": "s", + "c": "true", + "d": "44", + "e": "45.1", + "f": `["n1","n2"]`, + "g": `{"k1":"n1","k2":false,"k3":null,"k4":40.3,"k5":41}`, + "i": "0", + }) + }) +} + +func doubleSFxDataPoint( + metric string, + metricType datapoint.MetricType, + dims map[string]string, + val float64, +) *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: metric, + Timestamp: ts, + Value: datapoint.NewFloatValue(val), + MetricType: metricType, + Dimensions: cloneStringMap(dims), + } +} + +func int64SFxDataPoint( + metric string, + metricType datapoint.MetricType, + dims map[string]string, + val int64, +) *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: metric, + Timestamp: ts, + Value: datapoint.NewIntValue(val), + MetricType: metricType, + Dimensions: cloneStringMap(dims), + } +} + +func expectedFromDoubleHistogram( + metricName string, + dims map[string]string, + histDP metricsv1.DoubleHistogramDataPoint, + isDelta bool, +) []*datapoint.Datapoint { + buckets := histDP.GetBucketCounts() + + dps := make([]*datapoint.Datapoint, 0) + + typ := datapoint.Counter + if isDelta { + typ = datapoint.Count + } + + dps = append(dps, + int64SFxDataPoint(metricName+"_count", typ, dims, int64(histDP.GetCount())), + doubleSFxDataPoint(metricName, typ, dims, histDP.GetSum())) + + explicitBounds := histDP.GetExplicitBounds() + if explicitBounds == nil { + return dps + } + for i := 0; i < len(explicitBounds); i++ { + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(explicitBounds[i]) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, int64(buckets[i]))) + } + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(math.Inf(1)) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, int64(buckets[len(buckets)-1]))) + return dps +} + +func expectedFromIntHistogram( + metricName string, + dims map[string]string, + histDP metricsv1.IntHistogramDataPoint, + isDelta bool, +) []*datapoint.Datapoint { + buckets := histDP.GetBucketCounts() + + dps := make([]*datapoint.Datapoint, 0) + + typ := datapoint.Counter + if isDelta { + typ = datapoint.Count + } + + dps = append(dps, + int64SFxDataPoint(metricName+"_count", typ, dims, int64(histDP.GetCount())), + int64SFxDataPoint(metricName, typ, dims, histDP.GetSum())) + + explicitBounds := histDP.GetExplicitBounds() + if explicitBounds == nil { + return dps + } + for i := 0; i < len(explicitBounds); i++ { + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(explicitBounds[i]) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, int64(buckets[i]))) + } + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(math.Inf(1)) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, int64(buckets[len(buckets)-1]))) + return dps +} + +func expectedFromSummary(name string, labelMap map[string]string, count int64, sumVal float64) []*datapoint.Datapoint { + countName := name + "_count" + countPt := int64SFxDataPoint(countName, datapoint.Counter, labelMap, count) + sumPt := doubleSFxDataPoint(name, datapoint.Counter, labelMap, sumVal) + out := []*datapoint.Datapoint{countPt, sumPt} + quantileDimVals := []string{"0.25", "0.5", "0.75", "1"} + for i := 0; i < 4; i++ { + qDims := map[string]string{"quantile": quantileDimVals[i]} + qPt := doubleSFxDataPoint( + name+"_quantile", + datapoint.Gauge, + mergeStringMaps(labelMap, qDims), + float64(i), + ) + out = append(out, qPt) + } + return out +} + +func expectedFromEmptySummary(name string, labelMap map[string]string, count int64, sumVal float64) []*datapoint.Datapoint { + countName := name + "_count" + countPt := int64SFxDataPoint(countName, datapoint.Counter, labelMap, count) + sumPt := doubleSFxDataPoint(name, datapoint.Counter, labelMap, sumVal) + return []*datapoint.Datapoint{countPt, sumPt} +} + +func mergeDPs(dps ...[]*datapoint.Datapoint) []*datapoint.Datapoint { + var out []*datapoint.Datapoint + for i := range dps { + out = append(out, dps[i]...) + } + return out +} + +func cloneStringMap(m map[string]string) map[string]string { + out := make(map[string]string) + for k, v := range m { + out[k] = v + } + return out +} + +func stringMapToAttributeMap(m map[string]string) []*commonv1.StringKeyValue { + ret := make([]*commonv1.StringKeyValue, 0, len(m)) + for k, v := range m { + ret = append(ret, &commonv1.StringKeyValue{ + Key: k, + Value: v, + }) + } + return ret +} diff --git a/protocol/signalfx/datapoint.go b/protocol/signalfx/datapoint.go index 1406357..6ecc5f6 100644 --- a/protocol/signalfx/datapoint.go +++ b/protocol/signalfx/datapoint.go @@ -25,6 +25,7 @@ import ( "github.com/signalfx/golib/v3/sfxclient" "github.com/signalfx/golib/v3/web" "github.com/signalfx/ingest-protocols/logkey" + "github.com/signalfx/ingest-protocols/protocol" signalfxformat "github.com/signalfx/ingest-protocols/protocol/signalfx/format" ) @@ -138,7 +139,7 @@ func (decoder *ProtobufDecoderV2) Read(ctx context.Context, req *http.Request) ( jeff := buffs.Get().(*bytes.Buffer) defer buffs.Put(jeff) jeff.Reset() - if err = readFromRequest(jeff, req, decoder.Logger); err != nil { + if err = protocol.ReadFromRequest(jeff, req, decoder.Logger); err != nil { return err } var msg sfxmodel.DataPointUploadMessage diff --git a/protocol/signalfx/event.go b/protocol/signalfx/event.go index 6a7e4d3..228df61 100644 --- a/protocol/signalfx/event.go +++ b/protocol/signalfx/event.go @@ -15,6 +15,7 @@ import ( "github.com/signalfx/golib/v3/pointer" "github.com/signalfx/golib/v3/sfxclient" "github.com/signalfx/golib/v3/web" + "github.com/signalfx/ingest-protocols/protocol" signalfxformat "github.com/signalfx/ingest-protocols/protocol/signalfx/format" ) @@ -28,7 +29,7 @@ func (decoder *ProtobufEventDecoderV2) Read(ctx context.Context, req *http.Reque jeff := buffs.Get().(*bytes.Buffer) defer buffs.Put(jeff) jeff.Reset() - if err = readFromRequest(jeff, req, decoder.Logger); err != nil { + if err = protocol.ReadFromRequest(jeff, req, decoder.Logger); err != nil { return err } var msg sfxmodel.EventUploadMessage diff --git a/protocol/signalfx/helper.go b/protocol/util.go similarity index 70% rename from protocol/signalfx/helper.go rename to protocol/util.go index f19aa85..29067ef 100644 --- a/protocol/signalfx/helper.go +++ b/protocol/util.go @@ -1,4 +1,4 @@ -package signalfx +package protocol import ( "bytes" @@ -8,7 +8,8 @@ import ( "github.com/signalfx/ingest-protocols/logkey" ) -func readFromRequest(jeff *bytes.Buffer, req *http.Request, logger log.Logger) error { +// ReadFromRequest reads all the http body into the jeff buffer and logs if there is an error. +func ReadFromRequest(jeff *bytes.Buffer, req *http.Request, logger log.Logger) error { // for compressed transactions, contentLength isn't trustworthy readLen, err := jeff.ReadFrom(req.Body) if err != nil {