Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure span links and events generate events and get resource attrs #264

Merged
merged 7 commits into from
Jun 25, 2021
Merged
2 changes: 1 addition & 1 deletion route/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (r *Router) apiKeyChecker(next http.Handler) http.Handler {
return
}
}
err = errors.New(fmt.Sprintf("api key %s not found in list of authed keys", apiKey))
err = fmt.Errorf("api key %s not found in list of authed keys", apiKey)
r.handlerReturnWithError(w, ErrAuthNeeded, err)
})
}
Expand Down
49 changes: 43 additions & 6 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,18 +477,18 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ
eventAttrs[k] = v
}

events := make([]*types.Event, len(span.Events)+1)
events[0] = &types.Event{
events := make([]*types.Event, 0, 1+len(span.Events)+len(span.Links))
events = append(events, &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
SampleRate: uint(sampleRate),
Timestamp: timestamp,
Data: eventAttrs,
}
})

for i, sevent := range span.Events {
for _, sevent := range span.Events {
timestamp := time.Unix(0, int64(sevent.TimeUnixNano)).UTC()
attrs := map[string]interface{}{
"trace.trace_id": traceID,
Expand All @@ -501,22 +501,59 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ
if sevent.Attributes != nil {
addAttributesToMap(attrs, sevent.Attributes)
}
for k, v := range resourceAttrs {
attrs[k] = v
}
sampleRate, err := getSampleRateFromAttributes(attrs)
if err != nil {
debugLog.
WithField("error", err.Error()).
WithField("sampleRate", attrs["sampleRate"]).
Logf("error parsing sampleRate")
}
events[i+1] = &types.Event{
events = append(events, &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
SampleRate: uint(sampleRate),
Timestamp: timestamp,
Data: attrs,
})
}

for _, slink := range span.Links {
attrs := map[string]interface{}{
"trace.trace_id": traceID,
"trace.parent_id": spanID,
"trace.link.trace_id": bytesToTraceID(slink.TraceId),
"trace.link.span_id": hex.EncodeToString(slink.SpanId),
"parent_name": span.Name,
"meta.annotation_type": "link",
}

if slink.Attributes != nil {
addAttributesToMap(attrs, slink.Attributes)
}
for k, v := range resourceAttrs {
attrs[k] = v
}
sampleRate, err := getSampleRateFromAttributes(attrs)
if err != nil {
debugLog.
WithField("error", err.Error()).
WithField("sampleRate", attrs["sampleRate"]).
Logf("error parsing sampleRate")
}
events = append(events, &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
SampleRate: uint(sampleRate),
Timestamp: time.Time{}, //links don't have timestamps, so use empty time
Data: attrs,
})
}

for _, evt := range events {
Expand Down Expand Up @@ -886,7 +923,7 @@ func getSampleRateFromAttributes(attrs map[string]interface{}) (int, error) {
sampleRate = int(v)
}
default:
err = fmt.Errorf("Unrecognised sampleRate datatype - %T", sampleRate)
err = fmt.Errorf("unrecognised sampleRate datatype - %T", sampleRate)
sampleRate = defaultSampleRate
}
// remove sampleRate from event fields
Expand Down
117 changes: 113 additions & 4 deletions route/route_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// +build all race

package route

import (
"bytes"
"compress/gzip"
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
Expand All @@ -25,6 +24,7 @@ import (
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/transmit"
"github.com/stretchr/testify/assert"

"github.com/gorilla/mux"
"github.com/honeycombio/refinery/sharder"
Expand Down Expand Up @@ -405,16 +405,29 @@ func TestOTLPHandler(t *testing.T) {

mockMetrics := metrics.MockMetrics{}
mockMetrics.Start()
mockTransmission := &transmit.MockTransmission{}
mockTransmission.Start()
router := &Router{
Config: &config.MockConfig{},
Metrics: &mockMetrics,
UpstreamTransmission: &transmit.MockTransmission{},
UpstreamTransmission: mockTransmission,
iopLogger: iopLogger{
Logger: &logger.MockLogger{},
Logger: &logger.NullLogger{},
incomingOrPeer: "incoming",
},
}

conf := &config.MockConfig{
GetSendDelayVal: 0,
GetTraceTimeoutVal: 60 * time.Second,
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
SendTickerVal: 2 * time.Millisecond,
GetInMemoryCollectorCacheCapacityVal: config.InMemoryCollectorCacheCapacity{
CacheCapacity: 100,
MaxAlloc: 100,
},
}

t.Run("span with status", func(t *testing.T) {
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
Expand All @@ -427,6 +440,8 @@ func TestOTLPHandler(t *testing.T) {
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}
assert.Equal(t, 2, len(mockTransmission.Events))
mockTransmission.Flush()
})

t.Run("span without status", func(t *testing.T) {
Expand All @@ -441,6 +456,100 @@ func TestOTLPHandler(t *testing.T) {
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}
assert.Equal(t, 2, len(mockTransmission.Events))
mockTransmission.Flush()
})

// TODO: (MG) figuure out how we can test JSON created from OTLP requests
MikeGoldsmith marked this conversation as resolved.
Show resolved Hide resolved
// Below is example, but requires significant usage of collector, sampler, conf, etc
t.Run("creates events for span events", func(t *testing.T) {
t.Skip("need additional work to support inspecting outbound JSON")

traceID := []byte{0, 0, 0, 0, 1}
spanID := []byte{1, 0, 0, 0, 0}
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
InstrumentationLibrarySpans: []*trace.InstrumentationLibrarySpans{{
Spans: []*trace.Span{{
TraceId: traceID,
SpanId: spanID,
Name: "span_with_event",
Events: []*trace.Span_Event{{
TimeUnixNano: 12345,
Name: "span_link",
Attributes: []*common.KeyValue{{
Key: "event_attr_key", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "event_attr_val"}},
}},
}},
}},
}},
}},
}
_, err := router.Export(ctx, req)
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}

time.Sleep(conf.SendTickerVal * 2)

mockTransmission.Mux.Lock()
assert.Equal(t, 2, len(mockTransmission.Events))

spanEvent := mockTransmission.Events[0]
// assert.Equal(t, time.Unix(0, int64(12345)).UTC(), spanEvent.Timestamp)
assert.Equal(t, bytesToTraceID(traceID), spanEvent.Data["trace.trace_id"])
assert.Equal(t, hex.EncodeToString(spanID), spanEvent.Data["trace.span_id"])
assert.Equal(t, "span_link", spanEvent.Data["span.name"])
assert.Equal(t, "span_with_event", spanEvent.Data["parent.name"])
assert.Equal(t, "span_event", spanEvent.Data["meta.annotation_type"])
assert.Equal(t, "event_attr_key", spanEvent.Data["event_attr_val"])
mockTransmission.Mux.Unlock()
mockTransmission.Flush()
})

t.Run("creates events for span links", func(t *testing.T) {
t.Skip("need additional work to support inspecting outbound JSON")

traceID := []byte{0, 0, 0, 0, 1}
spanID := []byte{1, 0, 0, 0, 0}
linkTraceID := []byte{0, 0, 0, 0, 2}
linkSpanID := []byte{2, 0, 0, 0, 0}

req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*trace.ResourceSpans{{
InstrumentationLibrarySpans: []*trace.InstrumentationLibrarySpans{{
Spans: []*trace.Span{{
Name: "span_with_link",
TraceId: traceID,
SpanId: spanID,
Links: []*trace.Span_Link{{
TraceId: traceID,
SpanId: spanID,
TraceState: "link_trace_state",
Attributes: []*common.KeyValue{{
Key: "link_attr_key", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "link_attr_val"}},
}},
}},
}},
}},
}},
}
_, err := router.Export(ctx, req)
if err != nil {
t.Errorf(`Unexpected error: %s`, err)
}

time.Sleep(conf.SendTickerVal * 2)
assert.Equal(t, 2, len(mockTransmission.Events))

spanLink := mockTransmission.Events[1]
assert.Equal(t, bytesToTraceID(traceID), spanLink.Data["trace.trace_id"])
assert.Equal(t, hex.EncodeToString(spanID), spanLink.Data["trace.span_id"])
assert.Equal(t, bytesToTraceID(linkTraceID), spanLink.Data["trace.link.trace_id"])
assert.Equal(t, hex.EncodeToString(linkSpanID), spanLink.Data["trace.link.span_id"])
assert.Equal(t, "link", spanLink.Data["meta.annotation_type"])
assert.Equal(t, "link_attr_val", spanLink.Data["link_attr_key"])
mockTransmission.Flush()
})
}

Expand Down