Skip to content

Commit

Permalink
Add resource processor extractor
Browse files Browse the repository at this point in the history
  • Loading branch information
Garbett1 committed Jan 9, 2025
1 parent 4140f6d commit 93c25de
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 7 deletions.
10 changes: 7 additions & 3 deletions processor/loghouseprocessor/go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/processor/loghouseprocessor

go 1.21.0
go 1.22.0

toolchain go1.23.2

require (
github.com/json-iterator/go v1.1.12
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/confmap v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/consumer v0.105.1-0.20240717163034-43ed6184f9fe
Expand All @@ -17,6 +19,8 @@ require (

)

require github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.117.0

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -41,7 +45,7 @@ require (
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.105.1-0.20240717163034-43ed6184f9fe // indirect
go.opentelemetry.io/collector/featuregate v1.12.1-0.20240716231837-5753a58f712b // indirect
go.opentelemetry.io/collector/featuregate v1.23.0 // indirect
go.opentelemetry.io/collector/internal/globalgates v0.105.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.105.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.105.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions processor/loghouseprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions processor/loghouseprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -55,6 +56,9 @@ func (p *logProcessor) ConsumeLogs(ctx context.Context, l plog.Logs) error {
if err != nil {
p.logger.Debug("failed to parse log line", zap.Error(err))
}
// This does have a "last line wins" if we have somehow set the same key with different values.
// We are just going to ignore this case for now though.
promoteResourceAttrs(&logLine, &rlogs)
}
}
}
Expand Down Expand Up @@ -144,6 +148,15 @@ func processJSONLog(l *plog.LogRecord) {
}
}

func promoteResourceAttrs(l *plog.LogRecord, rlogs *plog.ResourceLogs) {
attributes, ok := l.Attributes().Get("resource")
if !ok {
return
}
merged := maps.MergeRawMaps(rlogs.Resource().Attributes().AsRaw(), attributes.Map().AsRaw())
rlogs.Resource().Attributes().FromRaw(merged)
}

// Both funcs copied from: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/internal/ids.go#L25

func ParseSpanID(spanIDStr string) (pcommon.SpanID, error) {
Expand Down
63 changes: 63 additions & 0 deletions processor/loghouseprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,66 @@ func Test_promoteTraceAndSpan(t *testing.T) {
})
}
}

func Test_promoteResourceAttrs(t *testing.T) {
t.Run("single log", func(t *testing.T) {
rl := plog.NewResourceLogs()
l := plog.NewLogRecord()
l.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}})

promoteResourceAttrs(&l, &rl)

val, ok := rl.Resource().Attributes().Get("r1")
assert.True(t, ok)
assert.Equal(t, "v1", val.Str())
})

t.Run("two logs", func(t *testing.T) {
rl := plog.NewResourceLogs()
l1 := plog.NewLogRecord()
l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}})
l2 := plog.NewLogRecord()
l2.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r2": "v2"}})

promoteResourceAttrs(&l1, &rl)
promoteResourceAttrs(&l2, &rl)

v1, ok := rl.Resource().Attributes().Get("r1")
assert.True(t, ok)
assert.Equal(t, "v1", v1.Str())

v2, ok := rl.Resource().Attributes().Get("r2")
assert.True(t, ok)
assert.Equal(t, "v2", v2.Str())
})

t.Run("last wins", func(t *testing.T) {
rl := plog.NewResourceLogs()
l1 := plog.NewLogRecord()
l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}})
l2 := plog.NewLogRecord()
l2.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v2"}})

promoteResourceAttrs(&l1, &rl)
promoteResourceAttrs(&l2, &rl)

v2, ok := rl.Resource().Attributes().Get("r1")
assert.True(t, ok)
assert.Equal(t, "v2", v2.Str())
})

t.Run("overwrite original", func(t *testing.T) {
rl := plog.NewResourceLogs()
rl.Resource().Attributes().FromRaw(map[string]any{"r1": "original"})
l1 := plog.NewLogRecord()
l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}})

promoteResourceAttrs(&l1, &rl)

v1, ok := rl.Resource().Attributes().Get("r1")
assert.True(t, ok)
assert.Equal(t, "v1", v1.Str())

})

}

0 comments on commit 93c25de

Please sign in to comment.