From 93a01ee553883328b79a2b3d530caf9bd4b3f013 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Sun, 14 Mar 2021 13:10:41 -0700 Subject: [PATCH] Change Kafka receiver to not use internal data Signed-off-by: Bogdan Drutu --- receiver/kafkareceiver/factory_test.go | 2 +- receiver/kafkareceiver/kafka_receiver_test.go | 12 ++++-------- receiver/kafkareceiver/otlp_unmarshaller.go | 18 +++++++----------- .../kafkareceiver/otlp_unmarshaller_test.go | 12 +++++------- receiver/kafkareceiver/unmarshaller.go | 2 +- 5 files changed, 18 insertions(+), 28 deletions(-) diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 4417776a9ef..855d76aa80d 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -73,7 +73,7 @@ func TestWithUnmarshallers(t *testing.T) { require.NotNil(t, exporter) }) t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = new(otlpProtoUnmarshaller).Encoding() + cfg.Encoding = new(otlpTracesPbUnmarshaller).Encoding() exporter, err := f.CreateTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) require.NoError(t, err) assert.NotNil(t, exporter) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 32e2305ede8..515a36c6157 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -35,7 +35,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/kafkaexporter" - otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" ) func TestNewReceiver_version_err(t *testing.T) { @@ -135,7 +134,7 @@ func TestConsumerGroupHandler(t *testing.T) { defer view.Unregister(views...) c := consumerGroupHandler{ - unmarshaller: &otlpProtoUnmarshaller{}, + unmarshaller: &otlpTracesPbUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewTracesNop(), @@ -179,7 +178,7 @@ func TestConsumerGroupHandler(t *testing.T) { func TestConsumerGroupHandler_error_unmarshall(t *testing.T) { c := consumerGroupHandler{ - unmarshaller: &otlpProtoUnmarshaller{}, + unmarshaller: &otlpTracesPbUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewTracesNop(), @@ -203,7 +202,7 @@ func TestConsumerGroupHandler_error_unmarshall(t *testing.T) { func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consumer") c := consumerGroupHandler{ - unmarshaller: &otlpProtoUnmarshaller{}, + unmarshaller: &otlpTracesPbUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewTracesErr(consumerError), @@ -222,10 +221,7 @@ func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) { td := pdata.NewTraces() td.ResourceSpans().Resize(1) - request := &otlptrace.ExportTraceServiceRequest{ - ResourceSpans: pdata.TracesToOtlp(td), - } - bts, err := request.Marshal() + bts, err := td.ToOtlpProtoBytes() require.NoError(t, err) groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} close(groupClaim.messageChan) diff --git a/receiver/kafkareceiver/otlp_unmarshaller.go b/receiver/kafkareceiver/otlp_unmarshaller.go index 9e63989ba2e..4a1621377a7 100644 --- a/receiver/kafkareceiver/otlp_unmarshaller.go +++ b/receiver/kafkareceiver/otlp_unmarshaller.go @@ -16,23 +16,19 @@ package kafkareceiver import ( "go.opentelemetry.io/collector/consumer/pdata" - otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" ) -type otlpProtoUnmarshaller struct { +type otlpTracesPbUnmarshaller struct { } -var _ Unmarshaller = (*otlpProtoUnmarshaller)(nil) +var _ Unmarshaller = (*otlpTracesPbUnmarshaller)(nil) -func (p *otlpProtoUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { - request := &otlptrace.ExportTraceServiceRequest{} - err := request.Unmarshal(bytes) - if err != nil { - return pdata.NewTraces(), err - } - return pdata.TracesFromOtlp(request.GetResourceSpans()), nil +func (p *otlpTracesPbUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { + td := pdata.NewTraces() + err := td.FromOtlpProtoBytes(bytes) + return td, err } -func (*otlpProtoUnmarshaller) Encoding() string { +func (*otlpTracesPbUnmarshaller) Encoding() string { return defaultEncoding } diff --git a/receiver/kafkareceiver/otlp_unmarshaller_test.go b/receiver/kafkareceiver/otlp_unmarshaller_test.go index bd6aabc71d2..09cfb28d90c 100644 --- a/receiver/kafkareceiver/otlp_unmarshaller_test.go +++ b/receiver/kafkareceiver/otlp_unmarshaller_test.go @@ -21,28 +21,26 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/pdata" - otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" ) func TestUnmarshallOTLP(t *testing.T) { td := pdata.NewTraces() td.ResourceSpans().Resize(1) td.ResourceSpans().At(0).Resource().Attributes().InsertString("foo", "bar") - request := &otlptrace.ExportTraceServiceRequest{ - ResourceSpans: pdata.TracesToOtlp(td), - } - expected, err := request.Marshal() + + expected, err := td.ToOtlpProtoBytes() require.NoError(t, err) - p := otlpProtoUnmarshaller{} + p := otlpTracesPbUnmarshaller{} got, err := p.Unmarshal(expected) require.NoError(t, err) + assert.Equal(t, td, got) assert.Equal(t, "otlp_proto", p.Encoding()) } func TestUnmarshallOTLP_error(t *testing.T) { - p := otlpProtoUnmarshaller{} + p := otlpTracesPbUnmarshaller{} got, err := p.Unmarshal([]byte("+$%")) assert.Equal(t, pdata.NewTraces(), got) assert.Error(t, err) diff --git a/receiver/kafkareceiver/unmarshaller.go b/receiver/kafkareceiver/unmarshaller.go index 5a2972607c2..25680fa0632 100644 --- a/receiver/kafkareceiver/unmarshaller.go +++ b/receiver/kafkareceiver/unmarshaller.go @@ -29,7 +29,7 @@ type Unmarshaller interface { // defaultUnmarshallers returns map of supported encodings with Unmarshaller. func defaultUnmarshallers() map[string]Unmarshaller { - otlp := &otlpProtoUnmarshaller{} + otlp := &otlpTracesPbUnmarshaller{} jaegerProto := jaegerProtoSpanUnmarshaller{} jaegerJSON := jaegerJSONSpanUnmarshaller{} zipkinProto := zipkinProtoSpanUnmarshaller{}