Skip to content

Commit

Permalink
Add OTLP translation
Browse files Browse the repository at this point in the history
This adds the protocol/otlp package that can translate from OTLP metric types
to SignalFx datapoints.  It follows the same conversion process as the OTEL
collector, but that code cannot be reused due to the internal format used within
the OTEL Collector.

This also adds a decoder that reads from an HTTP request and sends to a
sink.
  • Loading branch information
benkeith-splunk committed Feb 16, 2022
1 parent ccfdf7b commit 8a13824
Show file tree
Hide file tree
Showing 9 changed files with 1,395 additions and 4 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/smartystreets/assertions v1.0.1
github.com/smartystreets/goconvey v1.6.4
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/proto/otlp v0.7.0
google.golang.org/grpc v1.40.0
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,7 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/collector v0.28.0/go.mod h1:AP/BTXwo1eedoJO7V+HQ68CSvJU1lcdqOzJCgt1VsNs=
go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
51 changes: 51 additions & 0 deletions protocol/otlp/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package otlp

import (
"bytes"
"context"
"net/http"
"sync"

"github.com/signalfx/golib/v3/datapoint/dpsink"
"github.com/signalfx/golib/v3/log"
"github.com/signalfx/ingest-protocols/protocol"
"github.com/signalfx/ingest-protocols/protocol/signalfx"
metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
"google.golang.org/protobuf/proto"
)

type httpMetricDecoder struct {
sink dpsink.Sink
logger log.Logger
buffs sync.Pool
}

func NewHTTPMetricDecoder(sink dpsink.Sink, logger log.Logger) signalfx.ErrorReader {
return &httpMetricDecoder{
sink: sink,
logger: logger,
buffs: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}

func (d *httpMetricDecoder) Read(ctx context.Context, req *http.Request) (err error) {
jeff := d.buffs.Get().(*bytes.Buffer)
defer d.buffs.Put(jeff)
jeff.Reset()
if err = protocol.ReadFromRequest(jeff, req, d.logger); err != nil {
return err
}
var msg metricsservicev1.ExportMetricsServiceRequest
if err = proto.Unmarshal(jeff.Bytes(), &msg); err != nil {
return err
}
dps := FromOTLPMetricRequest(&msg)
if len(dps) > 0 {
err = d.sink.AddDatapoints(ctx, dps)
}
return nil
}
99 changes: 99 additions & 0 deletions protocol/otlp/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package otlp

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"sync"
"testing"

"github.com/signalfx/golib/v3/datapoint/dptest"
"github.com/signalfx/golib/v3/log"
. "github.com/smartystreets/goconvey/convey"
metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/protobuf/proto"
)

var errReadErr = errors.New("could not read")

type errorReader struct{}

func (errorReader *errorReader) Read([]byte) (int, error) {
return 0, errReadErr
}

func TestDecoder(t *testing.T) {
Convey("httpMetricDecoder", t, func() {
sendTo := dptest.NewBasicSink()
decoder := NewHTTPMetricDecoder(sendTo, log.Discard)

Convey("Bad request reading", func() {
req := &http.Request{
Body: io.NopCloser(&errorReader{}),
}
req.ContentLength = 1
ctx := context.Background()
So(decoder.Read(ctx, req), ShouldEqual, errReadErr)
})

Convey("Bad request content", func() {
req := &http.Request{
Body: io.NopCloser(bytes.NewBufferString("asdf")),
}
req.ContentLength = 4
ctx := context.Background()
So(decoder.Read(ctx, req), ShouldNotBeNil)
})

Convey("Good request", func(c C) {
var msg metricsservicev1.ExportMetricsServiceRequest
msg.ResourceMetrics = []*metricsv1.ResourceMetrics{
{
InstrumentationLibraryMetrics: []*metricsv1.InstrumentationLibraryMetrics{
{
Metrics: []*metricsv1.Metric{
{
Name: "test",
Data: &metricsv1.Metric_IntGauge{
IntGauge: &metricsv1.IntGauge{
DataPoints: []*metricsv1.IntDataPoint{
{
Labels: []*commonv1.StringKeyValue{},
StartTimeUnixNano: 1000,
TimeUnixNano: 1000,
Value: 4,
},
},
},
},
},
},
},
},
},
}
b, _ := proto.Marshal(&msg)
req := &http.Request{
Body: io.NopCloser(bytes.NewBuffer(b)),
}
req.ContentLength = int64(len(b))
ctx := context.Background()

var wg sync.WaitGroup
wg.Add(1)
go func() {
dp := <-sendTo.PointsChan
c.So(dp, ShouldNotBeNil)
wg.Done()
}()

So(decoder.Read(ctx, req), ShouldBeNil)

wg.Wait()
})
})
}
Loading

0 comments on commit 8a13824

Please sign in to comment.