Skip to content

Commit

Permalink
Change Kafka receiver to not use internal data (#2697)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Mar 16, 2021
1 parent da04a20 commit adaf38c
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 28 deletions.
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestWithUnmarshallers(t *testing.T) {
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpProtoUnmarshaller).Encoding()
cfg.Encoding = new(otlpTracesPbUnmarshaller).Encoding()
exporter, err := f.CreateTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
assert.NotNil(t, exporter)
Expand Down
12 changes: 4 additions & 8 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/kafkaexporter"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

func TestNewReceiver_version_err(t *testing.T) {
Expand Down Expand Up @@ -135,7 +134,7 @@ func TestConsumerGroupHandler(t *testing.T) {
defer view.Unregister(views...)

c := consumerGroupHandler{
unmarshaller: &otlpProtoUnmarshaller{},
unmarshaller: &otlpTracesPbUnmarshaller{},
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewTracesNop(),
Expand Down Expand Up @@ -179,7 +178,7 @@ func TestConsumerGroupHandler(t *testing.T) {

func TestConsumerGroupHandler_error_unmarshall(t *testing.T) {
c := consumerGroupHandler{
unmarshaller: &otlpProtoUnmarshaller{},
unmarshaller: &otlpTracesPbUnmarshaller{},
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewTracesNop(),
Expand All @@ -203,7 +202,7 @@ func TestConsumerGroupHandler_error_unmarshall(t *testing.T) {
func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) {
consumerError := errors.New("failed to consumer")
c := consumerGroupHandler{
unmarshaller: &otlpProtoUnmarshaller{},
unmarshaller: &otlpTracesPbUnmarshaller{},
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewTracesErr(consumerError),
Expand All @@ -222,10 +221,7 @@ func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) {

td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
request := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
}
bts, err := request.Marshal()
bts, err := td.ToOtlpProtoBytes()
require.NoError(t, err)
groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
close(groupClaim.messageChan)
Expand Down
18 changes: 7 additions & 11 deletions receiver/kafkareceiver/otlp_unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,19 @@ package kafkareceiver

import (
"go.opentelemetry.io/collector/consumer/pdata"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

type otlpProtoUnmarshaller struct {
type otlpTracesPbUnmarshaller struct {
}

var _ Unmarshaller = (*otlpProtoUnmarshaller)(nil)
var _ Unmarshaller = (*otlpTracesPbUnmarshaller)(nil)

func (p *otlpProtoUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) {
request := &otlptrace.ExportTraceServiceRequest{}
err := request.Unmarshal(bytes)
if err != nil {
return pdata.NewTraces(), err
}
return pdata.TracesFromOtlp(request.GetResourceSpans()), nil
func (p *otlpTracesPbUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) {
td := pdata.NewTraces()
err := td.FromOtlpProtoBytes(bytes)
return td, err
}

func (*otlpProtoUnmarshaller) Encoding() string {
func (*otlpTracesPbUnmarshaller) Encoding() string {
return defaultEncoding
}
12 changes: 5 additions & 7 deletions receiver/kafkareceiver/otlp_unmarshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,26 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

func TestUnmarshallOTLP(t *testing.T) {
td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).Resource().Attributes().InsertString("foo", "bar")
request := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
}
expected, err := request.Marshal()

expected, err := td.ToOtlpProtoBytes()
require.NoError(t, err)

p := otlpProtoUnmarshaller{}
p := otlpTracesPbUnmarshaller{}
got, err := p.Unmarshal(expected)
require.NoError(t, err)

assert.Equal(t, td, got)
assert.Equal(t, "otlp_proto", p.Encoding())
}

func TestUnmarshallOTLP_error(t *testing.T) {
p := otlpProtoUnmarshaller{}
p := otlpTracesPbUnmarshaller{}
got, err := p.Unmarshal([]byte("+$%"))
assert.Equal(t, pdata.NewTraces(), got)
assert.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Unmarshaller interface {

// defaultUnmarshallers returns map of supported encodings with Unmarshaller.
func defaultUnmarshallers() map[string]Unmarshaller {
otlp := &otlpProtoUnmarshaller{}
otlp := &otlpTracesPbUnmarshaller{}
jaegerProto := jaegerProtoSpanUnmarshaller{}
jaegerJSON := jaegerJSONSpanUnmarshaller{}
zipkinProto := zipkinProtoSpanUnmarshaller{}
Expand Down

0 comments on commit adaf38c

Please sign in to comment.