Skip to content

Commit

Permalink
Ensure span links and events generate events and get resource attrs (h…
Browse files Browse the repository at this point in the history
…oneycombio#264)

When generating events for OTLP span events and links, each sub-event should also get resource attributes added.
  • Loading branch information
MikeGoldsmith authored Jun 25, 2021
1 parent 2ee9513 commit 8179634
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 11 deletions.
2 changes: 1 addition & 1 deletion route/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (r *Router) apiKeyChecker(next http.Handler) http.Handler {
return
}
}
err = errors.New(fmt.Sprintf("api key %s not found in list of authed keys", apiKey))
err = fmt.Errorf("api key %s not found in list of authed keys", apiKey)
r.handlerReturnWithError(w, ErrAuthNeeded, err)
})
}
Expand Down
49 changes: 43 additions & 6 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,18 +477,18 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ
eventAttrs[k] = v
}

events := make([]*types.Event, len(span.Events)+1)
events[0] = &types.Event{
events := make([]*types.Event, 0, 1+len(span.Events)+len(span.Links))
events = append(events, &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
SampleRate: uint(sampleRate),
Timestamp: timestamp,
Data: eventAttrs,
}
})

for i, sevent := range span.Events {
for _, sevent := range span.Events {
timestamp := time.Unix(0, int64(sevent.TimeUnixNano)).UTC()
attrs := map[string]interface{}{
"trace.trace_id": traceID,
Expand All @@ -501,22 +501,59 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ
if sevent.Attributes != nil {
addAttributesToMap(attrs, sevent.Attributes)
}
for k, v := range resourceAttrs {
attrs[k] = v
}
sampleRate, err := getSampleRateFromAttributes(attrs)
if err != nil {
debugLog.
WithField("error", err.Error()).
WithField("sampleRate", attrs["sampleRate"]).
Logf("error parsing sampleRate")
}
events[i+1] = &types.Event{
events = append(events, &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
SampleRate: uint(sampleRate),
Timestamp: timestamp,
Data: attrs,
})
}

for _, slink := range span.Links {
attrs := map[string]interface{}{
"trace.trace_id": traceID,
"trace.parent_id": spanID,
"trace.link.trace_id": bytesToTraceID(slink.TraceId),
"trace.link.span_id": hex.EncodeToString(slink.SpanId),
"parent_name": span.Name,
"meta.annotation_type": "link",
}

if slink.Attributes != nil {
addAttributesToMap(attrs, slink.Attributes)
}
for k, v := range resourceAttrs {
attrs[k] = v
}
sampleRate, err := getSampleRateFromAttributes(attrs)
if err != nil {
debugLog.
WithField("error", err.Error()).
WithField("sampleRate", attrs["sampleRate"]).
Logf("error parsing sampleRate")
}
events = append(events, &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
SampleRate: uint(sampleRate),
Timestamp: time.Time{}, //links don't have timestamps, so use empty time
Data: attrs,
})
}

for _, evt := range events {
Expand Down Expand Up @@ -886,7 +923,7 @@ func getSampleRateFromAttributes(attrs map[string]interface{}) (int, error) {
sampleRate = int(v)
}
default:
err = fmt.Errorf("Unrecognised sampleRate datatype - %T", sampleRate)
err = fmt.Errorf("unrecognised sampleRate datatype - %T", sampleRate)
sampleRate = defaultSampleRate
}
// remove sampleRate from event fields
Expand Down
117 changes: 113 additions & 4 deletions route/route_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// +build all race

package route

import (
"bytes"
"compress/gzip"
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
Expand All @@ -25,6 +24,7 @@ import (
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/transmit"
"github.com/stretchr/testify/assert"

"github.com/gorilla/mux"
"github.com/honeycombio/refinery/sharder"
Expand Down Expand Up @@ -405,16 +405,29 @@ func TestOTLPHandler(t *testing.T) {

mockMetrics := metrics.MockMetrics{}
mockMetrics.Start()
mockTransmission := &transmit.MockTransmission{}
mockTransmission.Start()
router := &Router{
Config: &config.MockConfig{},
Metrics: &mockMetrics,
UpstreamTransmission: &transmit.MockTransmission{},
UpstreamTransmission: mockTransmission,
iopLogger: iopLogger{
Logger: &logger.MockLogger{},
Logger: &logger.NullLogger{},
incomingOrPeer: "incoming",
},
}

conf := &config.MockConfig{
GetSendDelayVal: 0,
GetTraceTimeoutVal: 60 * time.Second,
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
SendTickerVal: 2 * time.Millisecond,
GetInMemoryCollectorCacheCapacityVal: config.InMemoryCollectorCacheCapacity{
CacheCapacity: 100,
MaxAlloc: 100,
},
}

t.Run("span with status", func(t *testing.T) {
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
Expand All @@ -427,6 +440,8 @@ func TestOTLPHandler(t *testing.T) {
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}
assert.Equal(t, 2, len(mockTransmission.Events))
mockTransmission.Flush()
})

t.Run("span without status", func(t *testing.T) {
Expand All @@ -441,6 +456,100 @@ func TestOTLPHandler(t *testing.T) {
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}
assert.Equal(t, 2, len(mockTransmission.Events))
mockTransmission.Flush()
})

// TODO: (MG) figuure out how we can test JSON created from OTLP requests
// Below is example, but requires significant usage of collector, sampler, conf, etc
t.Run("creates events for span events", func(t *testing.T) {
t.Skip("need additional work to support inspecting outbound JSON")

traceID := []byte{0, 0, 0, 0, 1}
spanID := []byte{1, 0, 0, 0, 0}
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
InstrumentationLibrarySpans: []*trace.InstrumentationLibrarySpans{{
Spans: []*trace.Span{{
TraceId: traceID,
SpanId: spanID,
Name: "span_with_event",
Events: []*trace.Span_Event{{
TimeUnixNano: 12345,
Name: "span_link",
Attributes: []*common.KeyValue{{
Key: "event_attr_key", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "event_attr_val"}},
}},
}},
}},
}},
}},
}
_, err := router.Export(ctx, req)
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}

time.Sleep(conf.SendTickerVal * 2)

mockTransmission.Mux.Lock()
assert.Equal(t, 2, len(mockTransmission.Events))

spanEvent := mockTransmission.Events[0]
// assert.Equal(t, time.Unix(0, int64(12345)).UTC(), spanEvent.Timestamp)
assert.Equal(t, bytesToTraceID(traceID), spanEvent.Data["trace.trace_id"])
assert.Equal(t, hex.EncodeToString(spanID), spanEvent.Data["trace.span_id"])
assert.Equal(t, "span_link", spanEvent.Data["span.name"])
assert.Equal(t, "span_with_event", spanEvent.Data["parent.name"])
assert.Equal(t, "span_event", spanEvent.Data["meta.annotation_type"])
assert.Equal(t, "event_attr_key", spanEvent.Data["event_attr_val"])
mockTransmission.Mux.Unlock()
mockTransmission.Flush()
})

t.Run("creates events for span links", func(t *testing.T) {
t.Skip("need additional work to support inspecting outbound JSON")

traceID := []byte{0, 0, 0, 0, 1}
spanID := []byte{1, 0, 0, 0, 0}
linkTraceID := []byte{0, 0, 0, 0, 2}
linkSpanID := []byte{2, 0, 0, 0, 0}

req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
InstrumentationLibrarySpans: []*trace.InstrumentationLibrarySpans{{
Spans: []*trace.Span{{
Name: "span_with_link",
TraceId: traceID,
SpanId: spanID,
Links: []*trace.Span_Link{{
TraceId: traceID,
SpanId: spanID,
TraceState: "link_trace_state",
Attributes: []*common.KeyValue{{
Key: "link_attr_key", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "link_attr_val"}},
}},
}},
}},
}},
}},
}
_, err := router.Export(ctx, req)
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}

time.Sleep(conf.SendTickerVal * 2)
assert.Equal(t, 2, len(mockTransmission.Events))

spanLink := mockTransmission.Events[1]
assert.Equal(t, bytesToTraceID(traceID), spanLink.Data["trace.trace_id"])
assert.Equal(t, hex.EncodeToString(spanID), spanLink.Data["trace.span_id"])
assert.Equal(t, bytesToTraceID(linkTraceID), spanLink.Data["trace.link.trace_id"])
assert.Equal(t, hex.EncodeToString(linkSpanID), spanLink.Data["trace.link.span_id"])
assert.Equal(t, "link", spanLink.Data["meta.annotation_type"])
assert.Equal(t, "link_attr_val", spanLink.Data["link_attr_key"])
mockTransmission.Flush()
})
}

Expand Down

0 comments on commit 8179634

Please sign in to comment.