diff --git a/route/middleware.go b/route/middleware.go index 8264bc7608..7836433853 100644 --- a/route/middleware.go +++ b/route/middleware.go @@ -45,7 +45,7 @@ func (r *Router) apiKeyChecker(next http.Handler) http.Handler { return } } - err = errors.New(fmt.Sprintf("api key %s not found in list of authed keys", apiKey)) + err = fmt.Errorf("api key %s not found in list of authed keys", apiKey) r.handlerReturnWithError(w, ErrAuthNeeded, err) }) } diff --git a/route/route.go b/route/route.go index a43d9f5de8..437a7c1b19 100644 --- a/route/route.go +++ b/route/route.go @@ -477,8 +477,8 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ eventAttrs[k] = v } - events := make([]*types.Event, len(span.Events)+1) - events[0] = &types.Event{ + events := make([]*types.Event, 0, 1+len(span.Events)+len(span.Links)) + events = append(events, &types.Event{ Context: ctx, APIHost: apiHost, APIKey: apiKey, @@ -486,9 +486,9 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ SampleRate: uint(sampleRate), Timestamp: timestamp, Data: eventAttrs, - } + }) - for i, sevent := range span.Events { + for _, sevent := range span.Events { timestamp := time.Unix(0, int64(sevent.TimeUnixNano)).UTC() attrs := map[string]interface{}{ "trace.trace_id": traceID, @@ -501,6 +501,9 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ if sevent.Attributes != nil { addAttributesToMap(attrs, sevent.Attributes) } + for k, v := range resourceAttrs { + attrs[k] = v + } sampleRate, err := getSampleRateFromAttributes(attrs) if err != nil { debugLog. @@ -508,7 +511,7 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ WithField("sampleRate", attrs["sampleRate"]). Logf("error parsing sampleRate") } - events[i+1] = &types.Event{ + events = append(events, &types.Event{ Context: ctx, APIHost: apiHost, APIKey: apiKey, @@ -516,7 +519,41 @@ func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServ SampleRate: uint(sampleRate), Timestamp: timestamp, Data: attrs, + }) + } + + for _, slink := range span.Links { + attrs := map[string]interface{}{ + "trace.trace_id": traceID, + "trace.parent_id": spanID, + "trace.link.trace_id": bytesToTraceID(slink.TraceId), + "trace.link.span_id": hex.EncodeToString(slink.SpanId), + "parent_name": span.Name, + "meta.annotation_type": "link", + } + + if slink.Attributes != nil { + addAttributesToMap(attrs, slink.Attributes) + } + for k, v := range resourceAttrs { + attrs[k] = v + } + sampleRate, err := getSampleRateFromAttributes(attrs) + if err != nil { + debugLog. + WithField("error", err.Error()). + WithField("sampleRate", attrs["sampleRate"]). + Logf("error parsing sampleRate") } + events = append(events, &types.Event{ + Context: ctx, + APIHost: apiHost, + APIKey: apiKey, + Dataset: dataset, + SampleRate: uint(sampleRate), + Timestamp: time.Time{}, //links don't have timestamps, so use empty time + Data: attrs, + }) } for _, evt := range events { @@ -886,7 +923,7 @@ func getSampleRateFromAttributes(attrs map[string]interface{}) (int, error) { sampleRate = int(v) } default: - err = fmt.Errorf("Unrecognised sampleRate datatype - %T", sampleRate) + err = fmt.Errorf("unrecognised sampleRate datatype - %T", sampleRate) sampleRate = defaultSampleRate } // remove sampleRate from event fields diff --git a/route/route_test.go b/route/route_test.go index 8cb5bebf1f..1522d7df59 100644 --- a/route/route_test.go +++ b/route/route_test.go @@ -1,11 +1,10 @@ -// +build all race - package route import ( "bytes" "compress/gzip" "context" + "encoding/hex" "fmt" "io" "io/ioutil" @@ -25,6 +24,7 @@ import ( "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/transmit" + "github.com/stretchr/testify/assert" "github.com/gorilla/mux" "github.com/honeycombio/refinery/sharder" @@ -405,16 +405,29 @@ func TestOTLPHandler(t *testing.T) { mockMetrics := metrics.MockMetrics{} mockMetrics.Start() + mockTransmission := &transmit.MockTransmission{} + mockTransmission.Start() router := &Router{ Config: &config.MockConfig{}, Metrics: &mockMetrics, - UpstreamTransmission: &transmit.MockTransmission{}, + UpstreamTransmission: mockTransmission, iopLogger: iopLogger{ - Logger: &logger.MockLogger{}, + Logger: &logger.NullLogger{}, incomingOrPeer: "incoming", }, } + conf := &config.MockConfig{ + GetSendDelayVal: 0, + GetTraceTimeoutVal: 60 * time.Second, + GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, + SendTickerVal: 2 * time.Millisecond, + GetInMemoryCollectorCacheCapacityVal: config.InMemoryCollectorCacheCapacity{ + CacheCapacity: 100, + MaxAlloc: 100, + }, + } + t.Run("span with status", func(t *testing.T) { req := &collectortrace.ExportTraceServiceRequest{ ResourceSpans: []*trace.ResourceSpans{{ @@ -427,6 +440,8 @@ func TestOTLPHandler(t *testing.T) { if err != nil { t.Errorf(`Unexpected error: %s`, err) } + assert.Equal(t, 2, len(mockTransmission.Events)) + mockTransmission.Flush() }) t.Run("span without status", func(t *testing.T) { @@ -441,6 +456,100 @@ func TestOTLPHandler(t *testing.T) { if err != nil { t.Errorf(`Unexpected error: %s`, err) } + assert.Equal(t, 2, len(mockTransmission.Events)) + mockTransmission.Flush() + }) + + // TODO: (MG) figuure out how we can test JSON created from OTLP requests + // Below is example, but requires significant usage of collector, sampler, conf, etc + t.Run("creates events for span events", func(t *testing.T) { + t.Skip("need additional work to support inspecting outbound JSON") + + traceID := []byte{0, 0, 0, 0, 1} + spanID := []byte{1, 0, 0, 0, 0} + req := &collectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*trace.ResourceSpans{{ + InstrumentationLibrarySpans: []*trace.InstrumentationLibrarySpans{{ + Spans: []*trace.Span{{ + TraceId: traceID, + SpanId: spanID, + Name: "span_with_event", + Events: []*trace.Span_Event{{ + TimeUnixNano: 12345, + Name: "span_link", + Attributes: []*common.KeyValue{{ + Key: "event_attr_key", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "event_attr_val"}}, + }}, + }}, + }}, + }}, + }}, + } + _, err := router.Export(ctx, req) + if err != nil { + t.Errorf(`Unexpected error: %s`, err) + } + + time.Sleep(conf.SendTickerVal * 2) + + mockTransmission.Mux.Lock() + assert.Equal(t, 2, len(mockTransmission.Events)) + + spanEvent := mockTransmission.Events[0] + // assert.Equal(t, time.Unix(0, int64(12345)).UTC(), spanEvent.Timestamp) + assert.Equal(t, bytesToTraceID(traceID), spanEvent.Data["trace.trace_id"]) + assert.Equal(t, hex.EncodeToString(spanID), spanEvent.Data["trace.span_id"]) + assert.Equal(t, "span_link", spanEvent.Data["span.name"]) + assert.Equal(t, "span_with_event", spanEvent.Data["parent.name"]) + assert.Equal(t, "span_event", spanEvent.Data["meta.annotation_type"]) + assert.Equal(t, "event_attr_key", spanEvent.Data["event_attr_val"]) + mockTransmission.Mux.Unlock() + mockTransmission.Flush() + }) + + t.Run("creates events for span links", func(t *testing.T) { + t.Skip("need additional work to support inspecting outbound JSON") + + traceID := []byte{0, 0, 0, 0, 1} + spanID := []byte{1, 0, 0, 0, 0} + linkTraceID := []byte{0, 0, 0, 0, 2} + linkSpanID := []byte{2, 0, 0, 0, 0} + + req := &collectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*trace.ResourceSpans{{ + InstrumentationLibrarySpans: []*trace.InstrumentationLibrarySpans{{ + Spans: []*trace.Span{{ + Name: "span_with_link", + TraceId: traceID, + SpanId: spanID, + Links: []*trace.Span_Link{{ + TraceId: traceID, + SpanId: spanID, + TraceState: "link_trace_state", + Attributes: []*common.KeyValue{{ + Key: "link_attr_key", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "link_attr_val"}}, + }}, + }}, + }}, + }}, + }}, + } + _, err := router.Export(ctx, req) + if err != nil { + t.Errorf(`Unexpected error: %s`, err) + } + + time.Sleep(conf.SendTickerVal * 2) + assert.Equal(t, 2, len(mockTransmission.Events)) + + spanLink := mockTransmission.Events[1] + assert.Equal(t, bytesToTraceID(traceID), spanLink.Data["trace.trace_id"]) + assert.Equal(t, hex.EncodeToString(spanID), spanLink.Data["trace.span_id"]) + assert.Equal(t, bytesToTraceID(linkTraceID), spanLink.Data["trace.link.trace_id"]) + assert.Equal(t, hex.EncodeToString(linkSpanID), spanLink.Data["trace.link.span_id"]) + assert.Equal(t, "link", spanLink.Data["meta.annotation_type"]) + assert.Equal(t, "link_attr_val", spanLink.Data["link_attr_key"]) + mockTransmission.Flush() }) }