-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This adds the protocol/otlp package that can translate from OTLP metric types to SignalFx datapoints. It follows the same conversion process as the OTEL collector, but that code cannot be reused due to the internal format used within the OTEL Collector. This also adds a decoder that reads from an HTTP request and sends to a sink.
- Loading branch information
1 parent
ccfdf7b
commit b304cf4
Showing
10 changed files
with
1,370 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package otlp | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"net/http" | ||
"sync" | ||
|
||
"github.com/signalfx/golib/v3/datapoint/dpsink" | ||
"github.com/signalfx/golib/v3/log" | ||
"github.com/signalfx/ingest-protocols/logkey" | ||
"github.com/signalfx/ingest-protocols/protocol" | ||
"github.com/signalfx/ingest-protocols/protocol/signalfx" | ||
metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" | ||
"google.golang.org/protobuf/proto" | ||
) | ||
|
||
type httpMetricDecoder struct { | ||
sink dpsink.Sink | ||
logger log.Logger | ||
buffs sync.Pool | ||
} | ||
|
||
func NewHTTPMetricDecoder(sink dpsink.Sink, logger log.Logger) signalfx.ErrorReader { | ||
return &httpMetricDecoder{ | ||
sink: sink, | ||
logger: log.NewContext(logger).With(logkey.Protocol, "otlp"), | ||
buffs: sync.Pool{ | ||
New: func() interface{} { | ||
return new(bytes.Buffer) | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func (d *httpMetricDecoder) Read(ctx context.Context, req *http.Request) (err error) { | ||
jeff := d.buffs.Get().(*bytes.Buffer) | ||
defer d.buffs.Put(jeff) | ||
jeff.Reset() | ||
if err = protocol.ReadFromRequest(jeff, req, d.logger); err != nil { | ||
return err | ||
} | ||
var msg metricsservicev1.ExportMetricsServiceRequest | ||
if err = proto.Unmarshal(jeff.Bytes(), &msg); err != nil { | ||
return err | ||
} | ||
dps := FromOTLPMetricRequest(&msg) | ||
if len(dps) > 0 { | ||
err = d.sink.AddDatapoints(ctx, dps) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package otlp | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"io" | ||
"net/http" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/signalfx/golib/v3/datapoint/dptest" | ||
"github.com/signalfx/golib/v3/log" | ||
. "github.com/smartystreets/goconvey/convey" | ||
metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" | ||
commonv1 "go.opentelemetry.io/proto/otlp/common/v1" | ||
metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" | ||
"google.golang.org/protobuf/proto" | ||
) | ||
|
||
var errReadErr = errors.New("could not read") | ||
|
||
type errorReader struct{} | ||
|
||
func (errorReader *errorReader) Read([]byte) (int, error) { | ||
return 0, errReadErr | ||
} | ||
|
||
func TestDecoder(t *testing.T) { | ||
Convey("httpMetricDecoder", t, func() { | ||
sendTo := dptest.NewBasicSink() | ||
decoder := NewHTTPMetricDecoder(sendTo, log.Discard) | ||
|
||
Convey("Bad request reading", func() { | ||
req := &http.Request{ | ||
Body: io.NopCloser(&errorReader{}), | ||
} | ||
req.ContentLength = 1 | ||
ctx := context.Background() | ||
So(decoder.Read(ctx, req), ShouldEqual, errReadErr) | ||
}) | ||
|
||
Convey("Bad request content", func() { | ||
req := &http.Request{ | ||
Body: io.NopCloser(bytes.NewBufferString("asdf")), | ||
} | ||
req.ContentLength = 4 | ||
ctx := context.Background() | ||
So(decoder.Read(ctx, req), ShouldNotBeNil) | ||
}) | ||
|
||
Convey("Good request", func(c C) { | ||
var msg metricsservicev1.ExportMetricsServiceRequest | ||
msg.ResourceMetrics = []*metricsv1.ResourceMetrics{ | ||
{ | ||
InstrumentationLibraryMetrics: []*metricsv1.InstrumentationLibraryMetrics{ | ||
{ | ||
Metrics: []*metricsv1.Metric{ | ||
{ | ||
Name: "test", | ||
Data: &metricsv1.Metric_Gauge{ | ||
Gauge: &metricsv1.Gauge{ | ||
DataPoints: []*metricsv1.NumberDataPoint{ | ||
{ | ||
Attributes: []*commonv1.KeyValue{}, | ||
StartTimeUnixNano: 1000, | ||
TimeUnixNano: 1000, | ||
Value: &metricsv1.NumberDataPoint_AsInt{AsInt: 4}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
b, _ := proto.Marshal(&msg) | ||
req := &http.Request{ | ||
Body: io.NopCloser(bytes.NewBuffer(b)), | ||
} | ||
req.ContentLength = int64(len(b)) | ||
ctx := context.Background() | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
dp := <-sendTo.PointsChan | ||
c.So(dp, ShouldNotBeNil) | ||
wg.Done() | ||
}() | ||
|
||
So(decoder.Read(ctx, req), ShouldBeNil) | ||
|
||
wg.Wait() | ||
}) | ||
}) | ||
} |
Oops, something went wrong.