From 52d1414547c3fb6cad40610d034ec558288fabc4 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Mon, 20 Jan 2025 09:51:08 -0600 Subject: [PATCH] [chore] Add internal attribute package (#12073) This PR creates a new internal package which defines the correct set of attributes for each kind of component. This is a subset of #12057 which is broken off in order to reduce the overall size of that PR. As such, this package will not actually be used until #12057 is merged. --- service/internal/attribute/attribute.go | 103 ++++++++++ service/internal/attribute/attribute_test.go | 193 +++++++++++++++++++ 2 files changed, 296 insertions(+) create mode 100644 service/internal/attribute/attribute.go create mode 100644 service/internal/attribute/attribute_test.go diff --git a/service/internal/attribute/attribute.go b/service/internal/attribute/attribute.go new file mode 100644 index 00000000000..7bfdf217961 --- /dev/null +++ b/service/internal/attribute/attribute.go @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attribute // import "go.opentelemetry.io/collector/service/internal/attribute" + +import ( + "hash/fnv" + + "go.opentelemetry.io/otel/attribute" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" +) + +const ( + componentKindKey = "otelcol.component.kind" + componentIDKey = "otelcol.component.id" + pipelineIDKey = "otelcol.pipeline.id" + signalKey = "otelcol.signal" + signalOutputKey = "otelcol.signal.output" + + capabiltiesKind = "capabilities" + fanoutKind = "fanout" +) + +type Attributes struct { + set attribute.Set + id int64 +} + +func newAttributes(attrs ...attribute.KeyValue) *Attributes { + h := fnv.New64a() + for _, kv := range attrs { + h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")")) + } + return &Attributes{ + set: attribute.NewSet(attrs...), + id: int64(h.Sum64()), // #nosec G115 + } +} + +func (a Attributes) Attributes() *attribute.Set { + return &a.set +} + +func (a Attributes) ID() int64 { + return a.id +} + +func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindReceiver.String()), + attribute.String(signalKey, pipelineType.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindProcessor.String()), + attribute.String(signalKey, pipelineID.Signal().String()), + attribute.String(pipelineIDKey, pipelineID.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindExporter.String()), + attribute.String(signalKey, pipelineType.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindConnector.String()), + attribute.String(signalKey, exprPipelineType.String()), + attribute.String(signalOutputKey, rcvrPipelineType.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Capabilities(pipelineID pipeline.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, capabiltiesKind), + attribute.String(pipelineIDKey, pipelineID.String()), + ) +} + +func Fanout(pipelineID pipeline.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, fanoutKind), + attribute.String(pipelineIDKey, pipelineID.String()), + ) +} + +func Extension(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindExtension.String()), + attribute.String(componentIDKey, id.String()), + ) +} diff --git a/service/internal/attribute/attribute_test.go b/service/internal/attribute/attribute_test.go new file mode 100644 index 00000000000..6025f77a20b --- /dev/null +++ b/service/internal/attribute/attribute_test.go @@ -0,0 +1,193 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attribute + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" +) + +var ( + signals = []pipeline.Signal{ + pipeline.SignalTraces, + pipeline.SignalMetrics, + pipeline.SignalLogs, + xpipeline.SignalProfiles, + } + + cIDs = []component.ID{ + component.MustNewID("foo"), + component.MustNewID("foo2"), + component.MustNewID("bar"), + } + + pIDs = []pipeline.ID{ + pipeline.MustNewID("traces"), + pipeline.MustNewIDWithName("traces", "2"), + pipeline.MustNewID("metrics"), + pipeline.MustNewIDWithName("metrics", "2"), + pipeline.MustNewID("logs"), + pipeline.MustNewIDWithName("logs", "2"), + pipeline.MustNewID("profiles"), + pipeline.MustNewIDWithName("profiles", "2"), + } +) + +func TestReceiver(t *testing.T) { + for _, sig := range signals { + for _, id := range cIDs { + r := Receiver(sig, id) + componentKind, ok := r.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindReceiver.String(), componentKind.AsString()) + + signal, ok := r.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, sig.String(), signal.AsString()) + + componentID, ok := r.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestProcessor(t *testing.T) { + for _, pID := range pIDs { + for _, id := range cIDs { + p := Processor(pID, id) + componentKind, ok := p.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindProcessor.String(), componentKind.AsString()) + + pipelineID, ok := p.Attributes().Value(pipelineIDKey) + require.True(t, ok) + require.Equal(t, pID.String(), pipelineID.AsString()) + + componentID, ok := p.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestExporter(t *testing.T) { + for _, sig := range signals { + for _, id := range cIDs { + e := Exporter(sig, id) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExporter.String(), componentKind.AsString()) + + signal, ok := e.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, sig.String(), signal.AsString()) + + componentID, ok := e.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestConnector(t *testing.T) { + for _, exprSig := range signals { + for _, rcvrSig := range signals { + for _, id := range cIDs { + c := Connector(exprSig, rcvrSig, id) + componentKind, ok := c.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindConnector.String(), componentKind.AsString()) + + signal, ok := c.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, exprSig.String(), signal.AsString()) + + signalOutput, ok := c.Attributes().Value(signalOutputKey) + require.True(t, ok) + require.Equal(t, rcvrSig.String(), signalOutput.AsString()) + + componentID, ok := c.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } + } +} + +func TestExtension(t *testing.T) { + e := Extension(component.MustNewID("foo")) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExtension.String(), componentKind.AsString()) +} + +func TestSetEquality(t *testing.T) { + // The sets are created independently but should be exactly equivalent. + // We will ensure that corresponding elements are equal and that + // non-corresponding elements are not equal. + setI, setJ := createExampleSets(), createExampleSets() + for i, ei := range setI { + for j, ej := range setJ { + if i == j { + require.Equal(t, ei.ID(), ej.ID()) + require.True(t, ei.Attributes().Equals(ej.Attributes())) + } else { + require.NotEqual(t, ei.ID(), ej.ID()) + require.False(t, ei.Attributes().Equals(ej.Attributes())) + } + } + } +} + +func createExampleSets() []*Attributes { + sets := []*Attributes{} + + // Receiver examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Receiver(sig, id)) + } + } + + // Processor examples. + for _, pID := range pIDs { + for _, cID := range cIDs { + sets = append(sets, Processor(pID, cID)) + } + } + + // Exporter examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Exporter(sig, id)) + } + } + + // Connector examples. + for _, exprSig := range signals { + for _, rcvrSig := range signals { + for _, id := range cIDs { + sets = append(sets, Connector(exprSig, rcvrSig, id)) + } + } + } + + // Capabilities examples. + for _, pID := range pIDs { + sets = append(sets, Capabilities(pID)) + } + + // Fanout examples. + for _, pID := range pIDs { + sets = append(sets, Fanout(pID)) + } + + return sets +}