diff --git a/Rakefile b/Rakefile index fc3288734..4622cd394 100644 --- a/Rakefile +++ b/Rakefile @@ -67,6 +67,7 @@ EXCLUDE_LINT = [ 'model/services_gen.go', 'model/trace_gen.go', 'model/span_gen.go', + 'model/span.pb.go', ] task :default => [:ci] @@ -168,3 +169,8 @@ task :ci => [:fmt, :vet, :lint, :test, :build, :windows] task :err do sh "errcheck github.com/DataDog/datadog-trace-agent" end + +task :protobuf do + # be sure to have protobuf 3.x and go vendor installed + sh "protoc -I=model -I=vendor --gogofaster_out=model model/*.proto" +end diff --git a/agent/agent.go b/agent/agent.go index d15b551de..987aa3e76 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -24,10 +24,11 @@ const ( ) type processedTrace struct { - Trace model.Trace - Root *model.Span - Env string - Sublayers []model.SublayerValue + Trace model.Trace + WeightedTrace model.WeightedTrace + Root *model.Span + Env string + Sublayers []model.SublayerValue } func (pt *processedTrace) weight() float64 { @@ -217,7 +218,10 @@ func (a *Agent) Process(t model.Trace) { rate *= a.Receiver.preSampler.Rate() sampler.SetTraceAppliedSampleRate(root, rate) + // Need to do this computation before entering the concentrator + // as they access the Metrics map, which is not thread safe. t.ComputeTopLevel() + wt := model.NewWeightedTrace(t, root) sublayers := model.ComputeSublayers(t) model.SetSublayersOnSpan(root, sublayers) @@ -228,19 +232,16 @@ func (a *Agent) Process(t model.Trace) { } pt := processedTrace{ - Trace: t, - Root: root, - Env: a.conf.DefaultEnv, - Sublayers: sublayers, + Trace: t, + WeightedTrace: wt, + Root: root, + Env: a.conf.DefaultEnv, + Sublayers: sublayers, } if tenv := t.GetEnv(); tenv != "" { pt.Env = tenv } - // Need to do this computation before entering the concentrator - // as they access the Metrics map, which is not thread safe. - t.ComputeWeight(*root) - t.ComputeTopLevel() go func() { defer watchdog.LogOnPanic() a.Concentrator.Add(pt) diff --git a/agent/concentrator.go b/agent/concentrator.go index ce9073fe8..fbc7693e1 100644 --- a/agent/concentrator.go +++ b/agent/concentrator.go @@ -37,7 +37,7 @@ func NewConcentrator(aggregators []string, bsize int64) *Concentrator { func (c *Concentrator) Add(t processedTrace) { c.mu.Lock() - for _, s := range t.Trace { + for _, s := range t.WeightedTrace { btime := s.End() - s.End()%c.bsize b, ok := c.buckets[btime] if !ok { diff --git a/agent/concentrator_test.go b/agent/concentrator_test.go index d7f732dc9..6af0b5cd4 100644 --- a/agent/concentrator_test.go +++ b/agent/concentrator_test.go @@ -44,27 +44,30 @@ func TestConcentratorStatsCounts(t *testing.T) { now := model.Now() alignedNow := now - now%c.bsize + trace := model.Trace{ + // first bucket + testSpan(c, 1, 24, 3, "A1", "resource1", 0), + testSpan(c, 2, 12, 3, "A1", "resource1", 2), + testSpan(c, 3, 40, 3, "A2", "resource2", 2), + testSpan(c, 4, 300000000000, 3, "A2", "resource2", 2), // 5 minutes trace + testSpan(c, 5, 30, 3, "A2", "resourcefoo", 0), + // second bucket + testSpan(c, 6, 24, 2, "A1", "resource2", 0), + testSpan(c, 7, 12, 2, "A1", "resource1", 2), + testSpan(c, 8, 40, 2, "A2", "resource1", 2), + testSpan(c, 9, 30, 2, "A2", "resource2", 2), + testSpan(c, 10, 3600000000000, 2, "A2", "resourcefoo", 0), // 1 hour trace + // third bucket - but should not be flushed because it's the second to last + testSpan(c, 6, 24, 1, "A1", "resource2", 0), + } + trace.ComputeTopLevel() + wt := model.NewWeightedTrace(trace, trace.GetRoot()) + testTrace := processedTrace{ - Env: "none", - Trace: model.Trace{ - // first bucket - testSpan(c, 1, 24, 3, "A1", "resource1", 0), - testSpan(c, 2, 12, 3, "A1", "resource1", 2), - testSpan(c, 3, 40, 3, "A2", "resource2", 2), - testSpan(c, 4, 300000000000, 3, "A2", "resource2", 2), // 5 minutes trace - testSpan(c, 5, 30, 3, "A2", "resourcefoo", 0), - // second bucket - testSpan(c, 6, 24, 2, "A1", "resource2", 0), - testSpan(c, 7, 12, 2, "A1", "resource1", 2), - testSpan(c, 8, 40, 2, "A2", "resource1", 2), - testSpan(c, 9, 30, 2, "A2", "resource2", 2), - testSpan(c, 10, 3600000000000, 2, "A2", "resourcefoo", 0), // 1 hour trace - // third bucket - but should not be flushed because it's the second to last - testSpan(c, 6, 24, 1, "A1", "resource2", 0), - }, + Env: "none", + Trace: trace, + WeightedTrace: wt, } - testTrace.Trace.ComputeWeight(*testTrace.Trace.GetRoot()) - testTrace.Trace.ComputeTopLevel() c.Add(testTrace) stats := c.Flush() diff --git a/agent/model_test.go b/agent/model_test.go index 57d5acfb6..d80a0cec0 100644 --- a/agent/model_test.go +++ b/agent/model_test.go @@ -23,9 +23,9 @@ func BenchmarkHandleSpanRandom(b *testing.B) { for i := 0; i < b.N; i++ { trace := fixtures.RandomTrace(10, 8) root := trace.GetRoot() - trace.ComputeWeight(*root) trace.ComputeTopLevel() - for _, span := range trace { + wt := model.NewWeightedTrace(trace, root) + for _, span := range wt { sb.HandleSpan(span, defaultEnv, aggr, nil) } } diff --git a/fixtures/span.go b/fixtures/span.go index 52fadc8ed..2da0666bc 100644 --- a/fixtures/span.go +++ b/fixtures/span.go @@ -265,6 +265,16 @@ func RandomSpan() model.Span { } } +// RandomWeightedSpan generates a random weighted span, useful for stats tests +func RandomWeightedSpan() *model.WeightedSpan { + s := RandomSpan() + return &model.WeightedSpan{ + Span: &s, + Weight: 1, + TopLevel: true, + } +} + // GetTestSpan returns a Span with different fields set func GetTestSpan() model.Span { span := model.Span{ @@ -281,7 +291,6 @@ func GetTestSpan() model.Span { Metrics: map[string]float64{"http.monitor": 41.99}, } trace := model.Trace{span} - trace.ComputeWeight(model.Span{}) trace.ComputeTopLevel() return trace[0] } @@ -307,8 +316,15 @@ func TestSpan() model.Span { ParentID: 1111, Type: "http", } - trace := model.Trace{span} - trace.ComputeWeight(model.Span{}) - trace.ComputeTopLevel() - return trace[0] + return span +} + +// TestWeightedSpan returns a static test weighted span for reproductive stats tests +func TestWeightedSpan() *model.WeightedSpan { + s := TestSpan() + return &model.WeightedSpan{ + Span: &s, + Weight: 1, + TopLevel: true, + } } diff --git a/fixtures/stats.go b/fixtures/stats.go index b93b27cf7..94679ccef 100644 --- a/fixtures/stats.go +++ b/fixtures/stats.go @@ -2,6 +2,7 @@ package fixtures import ( "encoding/json" + "github.com/DataDog/datadog-trace-agent/model" ) @@ -12,7 +13,7 @@ const defaultEnv = "none" // TestStatsBucket returns a fixed stats bucket to be used in unit tests func TestStatsBucket() model.StatsBucket { srb := model.NewStatsRawBucket(0, 1e9) - srb.HandleSpan(TestSpan(), defaultEnv, defaultAggregators, nil) + srb.HandleSpan(TestWeightedSpan(), defaultEnv, defaultAggregators, nil) sb := srb.Export() // marshalling then unmarshalling data to: @@ -33,9 +34,9 @@ func TestStatsBucket() model.StatsBucket { } // StatsBucketWithSpans returns a stats bucket populated with spans stats -func StatsBucketWithSpans(s []model.Span) model.StatsBucket { +func StatsBucketWithSpans(spans []*model.WeightedSpan) model.StatsBucket { srb := model.NewStatsRawBucket(0, 1e9) - for _, s := range s { + for _, s := range spans { srb.HandleSpan(s, defaultEnv, defaultAggregators, nil) } return srb.Export() @@ -43,9 +44,9 @@ func StatsBucketWithSpans(s []model.Span) model.StatsBucket { // RandomStatsBucket returns a bucket made from n random spans, useful to run benchmarks and tests func RandomStatsBucket(n int) model.StatsBucket { - spans := make([]model.Span, 0, n) + spans := make([]*model.WeightedSpan, 0, n) for i := 0; i < n; i++ { - spans = append(spans, RandomSpan()) + spans = append(spans, RandomWeightedSpan()) } return StatsBucketWithSpans(spans) diff --git a/glide.lock b/glide.lock index c74b609e4..109e6c4ac 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 34b8446bdae5494c2d28db02226a944af14466e1bb9a9ed1ba28f26a8949780f -updated: 2017-11-29T12:30:01.732107+01:00 +hash: b5bd8505348ccf843ecec4e5f6c53980dd936c6d950dbaf22d4443ed037e625f +updated: 2017-12-04T13:48:56.549159484+01:00 imports: - name: github.com/cihub/seelog version: d2c6e5aa9fbfdd1c624e140287063c7730654115 @@ -17,6 +17,15 @@ imports: version: de8695c8edbf8236f30d6e1376e20b198a028d42 subpackages: - oleutil +- name: github.com/gogo/protobuf + version: 342cbe0a04158f6dcb03ca0079991a51a4248c02 + subpackages: + - gogoproto +- name: github.com/golang/protobuf + version: 1e59b77b52bf8e4b449a57e6f79f21226d571845 + subpackages: + - proto + - protoc-gen-go - name: github.com/golang/tools version: 219e654bb7266d3b73c4610ed24c33d12560826a subpackages: diff --git a/glide.yaml b/glide.yaml index 6ada4b321..479806267 100644 --- a/glide.yaml +++ b/glide.yaml @@ -29,6 +29,14 @@ import: - windows/svc/debug - windows/svc/eventlog - windows/svc/mgr +- package: github.com/golang/protobuf + subpackages: + - protoc-gen-go + - proto +- package: github.com/gogo/protobuf + version: v0.5 + subpackages: + - gogoproto testImport: - package: github.com/stretchr/testify version: v1.1.4 diff --git a/model/normalizer_test.go b/model/normalizer_test.go index ad16ef920..47e8bbcf2 100644 --- a/model/normalizer_test.go +++ b/model/normalizer_test.go @@ -227,7 +227,7 @@ func TestNormalizeTraceTraceIdMismatch(t *testing.T) { span2 := testSpan() span2.TraceID = 2 - trace := Trace{span1, span2} + trace := Trace{*span1, *span2} _, err := NormalizeTrace(trace) assert.Error(t, err) @@ -239,7 +239,7 @@ func TestNormalizeTraceInvalidSpan(t *testing.T) { span2 := testSpan() span2.Name = "" // invalid - trace := Trace{span1, span2} + trace := Trace{*span1, *span2} _, err := NormalizeTrace(trace) assert.Error(t, err) @@ -250,7 +250,7 @@ func TestNormalizeTraceDuplicateSpanID(t *testing.T) { span2 := testSpan() span2.SpanID = span1.SpanID - trace := Trace{span1, span2} + trace := Trace{*span1, *span2} _, err := NormalizeTrace(trace) assert.Error(t, err) @@ -262,7 +262,7 @@ func TestNormalizeTrace(t *testing.T) { span2 := testSpan() span2.SpanID++ - trace := Trace{span1, span2} + trace := Trace{*span1, *span2} _, err := NormalizeTrace(trace) assert.NoError(t, err) diff --git a/model/span.go b/model/span.go index e9590bfdc..2da067449 100644 --- a/model/span.go +++ b/model/span.go @@ -1,62 +1,21 @@ package model import ( - "bytes" - "fmt" "math/rand" ) const ( // SpanSampleRateMetricKey is the metric key holding the sample rate SpanSampleRateMetricKey = "_sample_rate" + // Fake type of span to indicate it is time to flush + flushMarkerType = "_FLUSH_MARKER" ) -// Span is the common struct we use to represent a dapper-like span -type Span struct { - // Mandatory - // Service & Name together determine what software we are measuring - Service string `json:"service" msg:"service"` // the software running (e.g. pylons) - Name string `json:"name" msg:"name"` // the metric name aka. the thing we're measuring (e.g. pylons.render OR psycopg2.query) - Resource string `json:"resource" msg:"resource"` // the natural key of what we measure (/index OR SELECT * FROM a WHERE id = ?) - TraceID uint64 `json:"trace_id" msg:"trace_id"` // ID that all spans in the same trace share - SpanID uint64 `json:"span_id" msg:"span_id"` // unique ID given to any span - Start int64 `json:"start" msg:"start"` // nanosecond epoch of span start - Duration int64 `json:"duration" msg:"duration"` // in nanoseconds - Error int32 `json:"error" msg:"error"` // error status of the span, 0 == OK - - // Optional - Meta map[string]string `json:"meta" msg:"meta"` // arbitrary tags/metadata - Metrics map[string]float64 `json:"metrics" msg:"metrics"` // arbitrary metrics - ParentID uint64 `json:"parent_id" msg:"parent_id"` // span ID of the span in which this one was created - Type string `json:"type" msg:"type"` // protocol associated with the span - - // Those are cached information, they are here not only for optimization, - // but because the func which fill their values read - // the Metrics map and causes map read/write concurrent accesses. - weight float64 // caches the result of Weight() called on the root span - topLevel bool // caches the result of TopLevel() -} - -// String formats a Span struct to be displayed as a string -func (s Span) String() string { - return fmt.Sprintf( - "Span[t_id:%d,s_id:%d,p_id:%d,ser:%s,name:%s,res:%s]", - s.TraceID, - s.SpanID, - s.ParentID, - s.Service, - s.Name, - s.Resource, - ) -} - // RandomID generates a random uint64 that we use for IDs func RandomID() uint64 { return uint64(rand.Int63()) } -const flushMarkerType = "_FLUSH_MARKER" - // IsFlushMarker tells if this is a marker span, which signals the system to flush func (s *Span) IsFlushMarker() bool { return s.Type == flushMarkerType @@ -85,41 +44,3 @@ func (s *Span) Weight() float64 { return 1.0 / sampleRate } - -// Spans is a slice of span pointers -type Spans []*Span - -func (spans Spans) String() string { - var buf bytes.Buffer - - buf.WriteString("Spans{") - - for i, span := range spans { - if i > 0 { - buf.WriteString(", ") - } - fmt.Fprintf(&buf, "%v", span) - } - - buf.WriteByte('}') - - return buf.String() -} - -// GoString returns a description of a slice of spans. -func (spans Spans) GoString() string { - var buf bytes.Buffer - - buf.WriteString("Spans{") - - for i, span := range spans { - if i > 0 { - buf.WriteString(", ") - } - fmt.Fprintf(&buf, "%#v", span) - } - - buf.WriteByte('}') - - return buf.String() -} diff --git a/model/span.pb.go b/model/span.pb.go new file mode 100644 index 000000000..b8dc45a29 --- /dev/null +++ b/model/span.pb.go @@ -0,0 +1,917 @@ +// Code generated by protoc-gen-gogo. +// source: span.proto +// DO NOT EDIT! + +/* + Package model is a generated protocol buffer package. + + It is generated from these files: + span.proto + trace.proto + trace_payload.proto + + It has these top-level messages: + Span + Trace + TracePayload +*/ +package model + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import _ "github.com/gogo/protobuf/gogoproto" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type Span struct { + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service" msg:"service"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name" msg:"name"` + Resource string `protobuf:"bytes,3,opt,name=resource,proto3" json:"resource" msg:"resource"` + TraceID uint64 `protobuf:"varint,4,opt,name=traceID,proto3" json:"trace_id" msg:"trace_id"` + SpanID uint64 `protobuf:"varint,5,opt,name=spanID,proto3" json:"span_id" msg:"span_id"` + ParentID uint64 `protobuf:"varint,6,opt,name=parentID,proto3" json:"parent_id" msg:"parent_id"` + Start int64 `protobuf:"varint,7,opt,name=start,proto3" json:"start" msg:"start"` + Duration int64 `protobuf:"varint,8,opt,name=duration,proto3" json:"duration" msg:"duration"` + Error int32 `protobuf:"varint,9,opt,name=error,proto3" json:"error" msg:"error"` + Meta map[string]string `protobuf:"bytes,10,rep,name=meta" json:"meta" msg:"meta" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Metrics map[string]float64 `protobuf:"bytes,11,rep,name=metrics" json:"metrics" msg:"metrics" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"fixed64,2,opt,name=value,proto3"` + Type string `protobuf:"bytes,12,opt,name=type,proto3" json:"type" msg:"type"` +} + +func (m *Span) Reset() { *m = Span{} } +func (m *Span) String() string { return proto.CompactTextString(m) } +func (*Span) ProtoMessage() {} +func (*Span) Descriptor() ([]byte, []int) { return fileDescriptorSpan, []int{0} } + +func (m *Span) GetMeta() map[string]string { + if m != nil { + return m.Meta + } + return nil +} + +func (m *Span) GetMetrics() map[string]float64 { + if m != nil { + return m.Metrics + } + return nil +} + +func init() { + proto.RegisterType((*Span)(nil), "model.Span") +} +func (m *Span) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Span) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Service) > 0 { + data[i] = 0xa + i++ + i = encodeVarintSpan(data, i, uint64(len(m.Service))) + i += copy(data[i:], m.Service) + } + if len(m.Name) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintSpan(data, i, uint64(len(m.Name))) + i += copy(data[i:], m.Name) + } + if len(m.Resource) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintSpan(data, i, uint64(len(m.Resource))) + i += copy(data[i:], m.Resource) + } + if m.TraceID != 0 { + data[i] = 0x20 + i++ + i = encodeVarintSpan(data, i, uint64(m.TraceID)) + } + if m.SpanID != 0 { + data[i] = 0x28 + i++ + i = encodeVarintSpan(data, i, uint64(m.SpanID)) + } + if m.ParentID != 0 { + data[i] = 0x30 + i++ + i = encodeVarintSpan(data, i, uint64(m.ParentID)) + } + if m.Start != 0 { + data[i] = 0x38 + i++ + i = encodeVarintSpan(data, i, uint64(m.Start)) + } + if m.Duration != 0 { + data[i] = 0x40 + i++ + i = encodeVarintSpan(data, i, uint64(m.Duration)) + } + if m.Error != 0 { + data[i] = 0x48 + i++ + i = encodeVarintSpan(data, i, uint64(m.Error)) + } + if len(m.Meta) > 0 { + for k, _ := range m.Meta { + data[i] = 0x52 + i++ + v := m.Meta[k] + mapSize := 1 + len(k) + sovSpan(uint64(len(k))) + 1 + len(v) + sovSpan(uint64(len(v))) + i = encodeVarintSpan(data, i, uint64(mapSize)) + data[i] = 0xa + i++ + i = encodeVarintSpan(data, i, uint64(len(k))) + i += copy(data[i:], k) + data[i] = 0x12 + i++ + i = encodeVarintSpan(data, i, uint64(len(v))) + i += copy(data[i:], v) + } + } + if len(m.Metrics) > 0 { + for k, _ := range m.Metrics { + data[i] = 0x5a + i++ + v := m.Metrics[k] + mapSize := 1 + len(k) + sovSpan(uint64(len(k))) + 1 + 8 + i = encodeVarintSpan(data, i, uint64(mapSize)) + data[i] = 0xa + i++ + i = encodeVarintSpan(data, i, uint64(len(k))) + i += copy(data[i:], k) + data[i] = 0x11 + i++ + i = encodeFixed64Span(data, i, uint64(math.Float64bits(float64(v)))) + } + } + if len(m.Type) > 0 { + data[i] = 0x62 + i++ + i = encodeVarintSpan(data, i, uint64(len(m.Type))) + i += copy(data[i:], m.Type) + } + return i, nil +} + +func encodeFixed64Span(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Span(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintSpan(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *Span) Size() (n int) { + var l int + _ = l + l = len(m.Service) + if l > 0 { + n += 1 + l + sovSpan(uint64(l)) + } + l = len(m.Name) + if l > 0 { + n += 1 + l + sovSpan(uint64(l)) + } + l = len(m.Resource) + if l > 0 { + n += 1 + l + sovSpan(uint64(l)) + } + if m.TraceID != 0 { + n += 1 + sovSpan(uint64(m.TraceID)) + } + if m.SpanID != 0 { + n += 1 + sovSpan(uint64(m.SpanID)) + } + if m.ParentID != 0 { + n += 1 + sovSpan(uint64(m.ParentID)) + } + if m.Start != 0 { + n += 1 + sovSpan(uint64(m.Start)) + } + if m.Duration != 0 { + n += 1 + sovSpan(uint64(m.Duration)) + } + if m.Error != 0 { + n += 1 + sovSpan(uint64(m.Error)) + } + if len(m.Meta) > 0 { + for k, v := range m.Meta { + _ = k + _ = v + mapEntrySize := 1 + len(k) + sovSpan(uint64(len(k))) + 1 + len(v) + sovSpan(uint64(len(v))) + n += mapEntrySize + 1 + sovSpan(uint64(mapEntrySize)) + } + } + if len(m.Metrics) > 0 { + for k, v := range m.Metrics { + _ = k + _ = v + mapEntrySize := 1 + len(k) + sovSpan(uint64(len(k))) + 1 + 8 + n += mapEntrySize + 1 + sovSpan(uint64(mapEntrySize)) + } + } + l = len(m.Type) + if l > 0 { + n += 1 + l + sovSpan(uint64(l)) + } + return n +} + +func sovSpan(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozSpan(x uint64) (n int) { + return sovSpan(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Span) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Span: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Span: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Service", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpan + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Service = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpan + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Resource", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpan + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Resource = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TraceID", wireType) + } + m.TraceID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.TraceID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SpanID", wireType) + } + m.SpanID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.SpanID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ParentID", wireType) + } + m.ParentID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ParentID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Start", wireType) + } + m.Start = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Start |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Duration", wireType) + } + m.Duration = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Duration |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + m.Error = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Error |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 10: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Meta", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSpan + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + var keykey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + keykey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLenmapkey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthSpan + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey := string(data[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + if m.Meta == nil { + m.Meta = make(map[string]string) + } + if iNdEx < postIndex { + var valuekey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + valuekey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLenmapvalue |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthSpan + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue := string(data[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + m.Meta[mapkey] = mapvalue + } else { + var mapvalue string + m.Meta[mapkey] = mapvalue + } + iNdEx = postIndex + case 11: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metrics", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthSpan + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + var keykey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + keykey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLenmapkey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthSpan + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey := string(data[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + if m.Metrics == nil { + m.Metrics = make(map[string]float64) + } + if iNdEx < postIndex { + var valuekey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + valuekey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + var mapvaluetemp uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + iNdEx += 8 + mapvaluetemp = uint64(data[iNdEx-8]) + mapvaluetemp |= uint64(data[iNdEx-7]) << 8 + mapvaluetemp |= uint64(data[iNdEx-6]) << 16 + mapvaluetemp |= uint64(data[iNdEx-5]) << 24 + mapvaluetemp |= uint64(data[iNdEx-4]) << 32 + mapvaluetemp |= uint64(data[iNdEx-3]) << 40 + mapvaluetemp |= uint64(data[iNdEx-2]) << 48 + mapvaluetemp |= uint64(data[iNdEx-1]) << 56 + mapvalue := math.Float64frombits(mapvaluetemp) + m.Metrics[mapkey] = mapvalue + } else { + var mapvalue float64 + m.Metrics[mapkey] = mapvalue + } + iNdEx = postIndex + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSpan + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Type = string(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSpan(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSpan + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipSpan(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSpan + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSpan + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSpan + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthSpan + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSpan + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipSpan(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthSpan = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowSpan = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("span.proto", fileDescriptorSpan) } + +var fileDescriptorSpan = []byte{ + // 493 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x93, 0xcd, 0x8e, 0xd3, 0x30, + 0x10, 0xc7, 0xf1, 0x36, 0xe9, 0x87, 0xbb, 0xc0, 0xca, 0x02, 0x64, 0x55, 0x28, 0x89, 0x7c, 0x8a, + 0x90, 0xc8, 0x4a, 0x80, 0x60, 0x55, 0x71, 0xa1, 0x2a, 0x87, 0x1e, 0x90, 0x90, 0x79, 0x00, 0xe4, + 0xa6, 0xa6, 0x44, 0x6c, 0x3e, 0xe4, 0x38, 0x2b, 0xf5, 0x2d, 0x78, 0x0a, 0x9e, 0x85, 0x23, 0x4f, + 0x10, 0xa1, 0x72, 0xcb, 0xb1, 0x4f, 0x80, 0x3c, 0x4e, 0xcc, 0x8a, 0xcb, 0xde, 0xf2, 0xff, 0xcd, + 0xfc, 0x3d, 0x9e, 0xf1, 0x04, 0xe3, 0xba, 0x12, 0x45, 0x52, 0xa9, 0x52, 0x97, 0xc4, 0xcf, 0xcb, + 0x9d, 0xbc, 0x5e, 0x3c, 0xdf, 0x67, 0xfa, 0x6b, 0xb3, 0x4d, 0xd2, 0x32, 0xbf, 0xdc, 0x97, 0xfb, + 0xf2, 0x12, 0xa2, 0xdb, 0xe6, 0x0b, 0x28, 0x10, 0xf0, 0x65, 0x5d, 0xec, 0xc7, 0x18, 0x7b, 0x9f, + 0x2a, 0x51, 0x90, 0xd7, 0x78, 0x52, 0x4b, 0x75, 0x93, 0xa5, 0x92, 0xa2, 0x08, 0xc5, 0xb3, 0xd5, + 0xd3, 0xae, 0x0d, 0x07, 0x74, 0x6a, 0xc3, 0xfb, 0x79, 0xbd, 0x5f, 0xb2, 0x5e, 0x33, 0x3e, 0x44, + 0xc8, 0x33, 0xec, 0x15, 0x22, 0x97, 0xf4, 0x0c, 0x4c, 0x4f, 0xba, 0x36, 0x04, 0x7d, 0x6a, 0x43, + 0x0c, 0x0e, 0x23, 0x18, 0x07, 0x46, 0x96, 0x78, 0xaa, 0x64, 0x5d, 0x36, 0x2a, 0x95, 0x74, 0x04, + 0xf9, 0x41, 0xd7, 0x86, 0x8e, 0x9d, 0xda, 0xf0, 0x01, 0x78, 0x06, 0xc0, 0xb8, 0x8b, 0x91, 0x2b, + 0x3c, 0xd1, 0x4a, 0xa4, 0x72, 0xb3, 0xa6, 0x5e, 0x84, 0x62, 0xcf, 0x5a, 0x01, 0x7d, 0xce, 0x76, + 0xce, 0x3a, 0x00, 0xc6, 0x87, 0x74, 0xf2, 0x0a, 0x8f, 0xcd, 0x98, 0x36, 0x6b, 0xea, 0x83, 0xd1, + 0x36, 0x56, 0x89, 0xc2, 0xfa, 0xfa, 0xc6, 0xac, 0x66, 0xbc, 0xcf, 0x25, 0x6f, 0xf1, 0xb4, 0x12, + 0x4a, 0x16, 0x7a, 0xb3, 0xa6, 0x63, 0xf0, 0x45, 0x5d, 0x1b, 0xce, 0x2c, 0xb3, 0xce, 0x87, 0xe0, + 0x74, 0x84, 0x71, 0xe7, 0x20, 0x09, 0xf6, 0x6b, 0x2d, 0x94, 0xa6, 0x93, 0x08, 0xc5, 0xa3, 0x15, + 0xed, 0xda, 0xd0, 0x82, 0x53, 0x1b, 0xce, 0x6d, 0x41, 0xa3, 0x18, 0xb7, 0xd4, 0x4c, 0x66, 0xd7, + 0x28, 0xa1, 0xb3, 0xb2, 0xa0, 0x53, 0xb0, 0x40, 0x7b, 0x03, 0x73, 0xed, 0x0d, 0x80, 0x71, 0x17, + 0x33, 0xb5, 0xa4, 0x52, 0xa5, 0xa2, 0xb3, 0x08, 0xc5, 0xbe, 0xad, 0x05, 0xc0, 0xd5, 0x02, 0xc5, + 0xb8, 0xa5, 0xe4, 0x1d, 0xf6, 0x72, 0xa9, 0x05, 0xc5, 0xd1, 0x28, 0x9e, 0xbf, 0x78, 0x9c, 0xc0, + 0xde, 0x24, 0x66, 0x09, 0x92, 0x0f, 0x52, 0x8b, 0xf7, 0x85, 0x56, 0x07, 0xfb, 0x90, 0x26, 0xcd, + 0x3d, 0xa4, 0x11, 0x8c, 0x03, 0x23, 0x1f, 0xf1, 0x24, 0x97, 0x5a, 0x65, 0x69, 0x4d, 0xe7, 0x70, + 0x0a, 0xfd, 0xef, 0x14, 0x13, 0xb2, 0x07, 0xc1, 0xb4, 0xfb, 0x64, 0x37, 0xed, 0x5e, 0x33, 0x3e, + 0x44, 0xcc, 0x1a, 0xe9, 0x43, 0x25, 0xe9, 0xf9, 0xbf, 0x35, 0x32, 0xda, 0x55, 0x37, 0x82, 0x71, + 0x60, 0x8b, 0x37, 0x78, 0xe6, 0x2e, 0x4a, 0x2e, 0xf0, 0xe8, 0x9b, 0x3c, 0xd8, 0x9d, 0xe5, 0xe6, + 0x93, 0x3c, 0xc2, 0xfe, 0x8d, 0xb8, 0x6e, 0xfa, 0x95, 0xe4, 0x56, 0x2c, 0xcf, 0xae, 0xd0, 0x62, + 0x89, 0xcf, 0x6f, 0xdf, 0xed, 0x2e, 0x2f, 0xba, 0xe5, 0x5d, 0x5d, 0xfc, 0x3c, 0x06, 0xe8, 0xd7, + 0x31, 0x40, 0xbf, 0x8f, 0x01, 0xfa, 0xfe, 0x27, 0xb8, 0xb7, 0x1d, 0xc3, 0x1f, 0xf4, 0xf2, 0x6f, + 0x00, 0x00, 0x00, 0xff, 0xff, 0x94, 0x9b, 0x26, 0x0f, 0x85, 0x03, 0x00, 0x00, +} diff --git a/model/span.proto b/model/span.proto new file mode 100644 index 000000000..302e0807b --- /dev/null +++ b/model/span.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package model; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +message Span { + string service = 1 [(gogoproto.jsontag) = "service", (gogoproto.moretags) = "msg:\"service\""]; + string name = 2 [(gogoproto.jsontag) = "name", (gogoproto.moretags) = "msg:\"name\""]; + string resource = 3 [(gogoproto.jsontag) = "resource", (gogoproto.moretags) = "msg:\"resource\""]; + uint64 traceID = 4 [(gogoproto.jsontag) = "trace_id", (gogoproto.moretags) = "msg:\"trace_id\""]; + uint64 spanID = 5 [(gogoproto.jsontag) = "span_id", (gogoproto.moretags) = "msg:\"span_id\""]; + uint64 parentID = 6 [(gogoproto.jsontag) = "parent_id", (gogoproto.moretags) = "msg:\"parent_id\""]; + int64 start = 7 [(gogoproto.jsontag) = "start", (gogoproto.moretags) = "msg:\"start\""]; + int64 duration = 8 [(gogoproto.jsontag) = "duration", (gogoproto.moretags) = "msg:\"duration\""]; + int32 error = 9 [(gogoproto.jsontag) = "error", (gogoproto.moretags) = "msg:\"error\""]; + map meta = 10 [(gogoproto.jsontag) = "meta", (gogoproto.moretags) = "msg:\"meta\""]; + map metrics = 11 [(gogoproto.jsontag) = "metrics", (gogoproto.moretags) = "msg:\"metrics\""]; + string type = 12 [(gogoproto.jsontag) = "type", (gogoproto.moretags) = "msg:\"type\""]; +} diff --git a/model/span_test.go b/model/span_test.go index f5f521d77..13e91a9e1 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -6,8 +6,8 @@ import ( "github.com/stretchr/testify/assert" ) -func testSpan() Span { - return Span{ +func testSpan() *Span { + return &Span{ Duration: 10000000, Error: 0, Resource: "GET /some/raclette", diff --git a/model/stats_test.go b/model/stats_test.go index c286a42a7..359620574 100644 --- a/model/stats_test.go +++ b/model/stats_test.go @@ -14,7 +14,7 @@ import ( const defaultEnv = "default" -func testSpans() []Span { +func testWeightedSpans() WeightedTrace { spans := []Span{ Span{Service: "A", Name: "A.foo", Resource: "α", Duration: 1}, Span{Service: "A", Name: "A.foo", Resource: "β", Duration: 2, Error: 1}, @@ -25,11 +25,15 @@ func testSpans() []Span { Span{Service: "C", Name: "sql.query", Resource: "δ", Duration: 7}, Span{Service: "C", Name: "sql.query", Resource: "δ", Duration: 8}, } + tws := make(WeightedTrace, len(spans)) for i := range spans { - spans[i].weight = 1 - spans[i].topLevel = true + tws[i] = &WeightedSpan{ + Span: &spans[i], + Weight: 1, + TopLevel: true, + } } - return spans + return tws } func testTrace() Trace { @@ -52,7 +56,6 @@ func testTrace() Trace { Start: 10, Duration: 3, Error: 1}, } - trace.ComputeWeight(*trace.GetRoot()) trace.ComputeTopLevel() return trace } @@ -79,7 +82,6 @@ func testTraceTopLevel() Trace { Start: 10, Duration: 3, Error: 1}, } - trace.ComputeWeight(*trace.GetRoot()) trace.ComputeTopLevel() return trace } @@ -107,8 +109,8 @@ func TestStatsBucketDefault(t *testing.T) { // No custom aggregators only the defaults aggr := []string{} - for _, s := range testSpans() { - t.Logf("weight: %f, topLevel: %v", s.weight, s.topLevel) + for _, s := range testWeightedSpans() { + t.Logf("weight: %f, topLevel: %v", s.Weight, s.TopLevel) srb.HandleSpan(s, defaultEnv, aggr, nil) } sb := srb.Export() @@ -215,7 +217,7 @@ func TestStatsBucketExtraAggregators(t *testing.T) { // one custom aggregator aggr := []string{"version"} - for _, s := range testSpans() { + for _, s := range testWeightedSpans() { srb.HandleSpan(s, defaultEnv, aggr, nil) } sb := srb.Export() @@ -265,7 +267,11 @@ func TestStatsBucketMany(t *testing.T) { assert := assert.New(t) - templateSpan := Span{Service: "A", Name: "A.foo", Resource: "α", Duration: 7, weight: 1} + templateSpan := &WeightedSpan{ + Span: &Span{Service: "A", Name: "A.foo", Resource: "α", Duration: 7}, + Weight: 1, + TopLevel: true, + } const n = 100000 srb := NewStatsRawBucket(0, 1e9) @@ -302,13 +308,15 @@ func TestStatsBucketSublayers(t *testing.T) { root := tr.GetRoot() SetSublayersOnSpan(root, sublayers) + wt := NewWeightedTrace(tr, root) + assert.NotNil(sublayers) srb := NewStatsRawBucket(0, 1e9) // No custom aggregators only the defaults aggr := []string{} - for _, s := range tr { + for _, s := range wt { srb.HandleSpan(s, defaultEnv, aggr, &sublayers) } sb := srb.Export() @@ -398,13 +406,15 @@ func TestStatsBucketSublayersTopLevel(t *testing.T) { root := tr.GetRoot() SetSublayersOnSpan(root, sublayers) + wt := NewWeightedTrace(tr, root) + assert.NotNil(sublayers) srb := NewStatsRawBucket(0, 1e9) // No custom aggregators only the defaults aggr := []string{} - for _, s := range tr { + for _, s := range wt { srb.HandleSpan(s, defaultEnv, aggr, &sublayers) } sb := srb.Export() @@ -513,7 +523,7 @@ func BenchmarkHandleSpan(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - for _, s := range testSpans() { + for _, s := range testWeightedSpans() { srb.HandleSpan(s, defaultEnv, aggr, nil) } } @@ -529,10 +539,12 @@ func BenchmarkHandleSpanSublayers(b *testing.B) { root := tr.GetRoot() SetSublayersOnSpan(root, sublayers) + wt := NewWeightedTrace(tr, root) + b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - for _, s := range tr { + for _, s := range wt { srb.HandleSpan(s, defaultEnv, aggr, &sublayers) } } diff --git a/model/statsraw.go b/model/statsraw.go index a32b8478e..09d949c1b 100644 --- a/model/statsraw.go +++ b/model/statsraw.go @@ -184,7 +184,7 @@ func assembleGrain(b *bytes.Buffer, env, resource, service string, m map[string] } // HandleSpan adds the span to this bucket stats, aggregated with the finest grain matching given aggregators -func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, sublayers *[]SublayerValue) { +func (sb *StatsRawBucket) HandleSpan(s *WeightedSpan, env string, aggregators []string, sublayers *[]SublayerValue) { if env == "" { panic("env should never be empty") } @@ -210,7 +210,7 @@ func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, s } } -func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) { +func (sb *StatsRawBucket) add(s *WeightedSpan, aggr string, tags TagSet) { var gs groupedStats var ok bool @@ -219,15 +219,15 @@ func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) { gs = newGroupedStats(tags) } - if s.topLevel { - gs.topLevel += s.weight + if s.TopLevel { + gs.topLevel += s.Weight } - gs.hits += s.weight + gs.hits += s.Weight if s.Error != 0 { - gs.errors += s.weight + gs.errors += s.Weight } - gs.duration += float64(s.Duration) * s.weight + gs.duration += float64(s.Duration) * s.Weight // TODO add for s.Metrics ability to define arbitrary counts and distros, check some config? // alter resolution of duration distro @@ -241,7 +241,7 @@ func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) { sb.data[key] = gs } -func (sb *StatsRawBucket) addSublayer(s Span, aggr string, tags TagSet, sub SublayerValue) { +func (sb *StatsRawBucket) addSublayer(s *WeightedSpan, aggr string, tags TagSet, sub SublayerValue) { // This is not as efficient as a "regular" add as we don't update // all sublayers at once (one call for HITS, and another one for ERRORS, DURATION...) // when logically, if we have a sublayer for HITS, we also have one for DURATION, @@ -260,11 +260,11 @@ func (sb *StatsRawBucket) addSublayer(s Span, aggr string, tags TagSet, sub Subl ss = newSublayerStats(subTags) } - if s.topLevel { - ss.topLevel += s.weight + if s.TopLevel { + ss.topLevel += s.Weight } - ss.value += int64(s.weight * sub.Value) + ss.value += int64(s.Weight * sub.Value) sb.sublayerData[key] = ss } diff --git a/model/sublayers.go b/model/sublayers.go index 6e6b7d2f8..dabdc6044 100644 --- a/model/sublayers.go +++ b/model/sublayers.go @@ -163,18 +163,18 @@ func buildTraceTimestamps(trace Trace) []int64 { // activeSpansMap is used by buildTraceActiveSpansMapping and is just // a map with a add function setting the key to the empty slice of no // entry exists -type activeSpansMap map[int64]Spans +type activeSpansMap map[int64][]*Span func (a activeSpansMap) Add(ts int64, span *Span) { if _, ok := a[ts]; !ok { - a[ts] = make(Spans, 0, 1) + a[ts] = make([]*Span, 0, 1) } a[ts] = append(a[ts], span) } // buildTraceActiveSpansMapping returns a mappging from timestamps to // a set of active spans -func buildTraceActiveSpansMapping(trace Trace, timestamps []int64) map[int64]Spans { +func buildTraceActiveSpansMapping(trace Trace, timestamps []int64) map[int64][]*Span { activeSpans := make(activeSpansMap, len(timestamps)) tsToIdx := make(map[int64]int, len(timestamps)) diff --git a/model/top_level.go b/model/top_level.go index c0dd7bd7a..bbfeee044 100644 --- a/model/top_level.go +++ b/model/top_level.go @@ -47,7 +47,6 @@ func (s *Span) setTopLevel(topLevel bool) { if len(s.Metrics) == 0 { s.Metrics = nil } - s.topLevel = false return } if s.Metrics == nil { @@ -56,9 +55,6 @@ func (s *Span) setTopLevel(topLevel bool) { // Setting the metrics value, so that code downstream in the pipeline // can identify this as top-level without recomputing everything. s.Metrics[topLevelKey] = 1 - // Setting the private attribute, this is used by internal agent code - // which can't access the metrics map because of concurrency issues. - s.topLevel = true } // TopLevel returns true if span is top-level. diff --git a/model/top_level_test.go b/model/top_level_test.go index e462890da..576986e02 100644 --- a/model/top_level_test.go +++ b/model/top_level_test.go @@ -20,15 +20,10 @@ func TestTopLevelTypical(t *testing.T) { tr.ComputeTopLevel() assert.True(tr[0].TopLevel(), "root span should be top-level") - assert.True(tr[0].topLevel, "root span should be top-level") assert.False(tr[1].TopLevel(), "main service, and not a root span, not top-level") - assert.False(tr[1].topLevel, "main service, and not a root span, not top-level") assert.True(tr[2].TopLevel(), "only 1 span for this service, should be top-level") - assert.True(tr[2].topLevel, "only 1 span for this service, should be top-level") assert.True(tr[3].TopLevel(), "only 1 span for this service, should be top-level") - assert.True(tr[3].topLevel, "only 1 span for this service, should be top-level") assert.False(tr[4].TopLevel(), "yet another sup span, not top-level") - assert.False(tr[4].topLevel, "yet another sup span, not top-level") } func TestTopLevelSingle(t *testing.T) { @@ -67,15 +62,10 @@ func TestTopLevelOneService(t *testing.T) { tr.ComputeTopLevel() assert.False(tr[0].TopLevel(), "just a sub-span, not top-level") - assert.False(tr[0].topLevel, "just a sub-span, not top-level") assert.False(tr[1].TopLevel(), "just a sub-span, not top-level") - assert.False(tr[1].topLevel, "just a sub-span, not top-level") assert.True(tr[2].TopLevel(), "root span should be top-level") - assert.True(tr[2].topLevel, "root span should be top-level") assert.False(tr[3].TopLevel(), "just a sub-span, not top-level") - assert.False(tr[3].topLevel, "just a sub-span, not top-level") assert.False(tr[4].TopLevel(), "just a sub-span, not top-level") - assert.False(tr[4].topLevel, "just a sub-span, not top-level") } func TestTopLevelLocalRoot(t *testing.T) { @@ -94,19 +84,12 @@ func TestTopLevelLocalRoot(t *testing.T) { tr.ComputeTopLevel() assert.True(tr[0].TopLevel(), "root span should be top-level") - assert.True(tr[0].topLevel, "root span should be top-level") assert.False(tr[1].TopLevel(), "main service, and not a root span, not top-level") - assert.False(tr[1].topLevel, "main service, and not a root span, not top-level") assert.True(tr[2].TopLevel(), "only 1 span for this service, should be top-level") - assert.True(tr[2].topLevel, "only 1 span for this service, should be top-level") assert.True(tr[3].TopLevel(), "top-level but not root") - assert.True(tr[3].topLevel, "top-level but not root") assert.False(tr[4].TopLevel(), "yet another sup span, not top-level") - assert.False(tr[4].topLevel, "yet another sup span, not top-level") assert.False(tr[5].TopLevel(), "yet another sup span, not top-level") - assert.False(tr[5].topLevel, "yet another sup span, not top-level") assert.False(tr[6].TopLevel(), "yet another sup span, not top-level") - assert.False(tr[6].topLevel, "yet another sup span, not top-level") } func TestTopLevelWithTag(t *testing.T) { @@ -122,10 +105,8 @@ func TestTopLevelWithTag(t *testing.T) { t.Logf("%v\n", tr[1].Metrics) assert.True(tr[0].TopLevel(), "root span should be top-level") - assert.True(tr[0].topLevel, "root span should be top-level") assert.Equal(float64(42), tr[0].Metrics["custom"], "custom metric should still be here") assert.False(tr[1].TopLevel(), "not a top-level span") - assert.False(tr[1].topLevel, "not a top-level span") assert.Equal(float64(42), tr[1].Metrics["custom"], "custom metric should still be here") } @@ -137,20 +118,16 @@ func TestTopLevelGetSetBlackBox(t *testing.T) { assert.False(span.TopLevel(), "by default, all spans are considered non top-level") span.setTopLevel(true) assert.True(span.TopLevel(), "marked as top-level") - assert.True(span.topLevel, "marked as top-level") span.setTopLevel(false) assert.False(span.TopLevel(), "no more top-level") - assert.False(span.topLevel, "no more top-level") span.Metrics = map[string]float64{"custom": 42} assert.False(span.TopLevel(), "by default, all spans are considered non top-level") span.setTopLevel(true) assert.True(span.TopLevel(), "marked as top-level") - assert.True(span.topLevel, "marked as top-level") span.setTopLevel(false) assert.False(span.TopLevel(), "no more top-level") - assert.False(span.topLevel, "no more top-level") } func TestTopLevelGetSetMetrics(t *testing.T) { diff --git a/model/trace.go b/model/trace.go index 686f0c2c0..9564afbdd 100644 --- a/model/trace.go +++ b/model/trace.go @@ -68,8 +68,8 @@ func (t Trace) GetRoot() *Span { // ChildrenMap returns a map containing for each span id the list of its // direct children. -func (t Trace) ChildrenMap() map[uint64]Spans { - childrenMap := make(map[uint64]Spans) +func (t Trace) ChildrenMap() map[uint64][]*Span { + childrenMap := make(map[uint64][]*Span) for i := range t { span := &t[i] @@ -80,14 +80,14 @@ func (t Trace) ChildrenMap() map[uint64]Spans { children, ok := childrenMap[span.SpanID] if !ok { - childrenMap[span.SpanID] = Spans{} + childrenMap[span.SpanID] = []*Span{} } children, ok = childrenMap[span.ParentID] if ok { children = append(children, span) } else { - children = Spans{span} + children = []*Span{span} } childrenMap[span.ParentID] = children } @@ -99,14 +99,3 @@ func (t Trace) ChildrenMap() map[uint64]Spans { func NewTraceFlushMarker() Trace { return []Span{NewFlushMarker()} } - -// ComputeWeight sets the weight private attribute to the weight -// of the root Span. This is because sampling ratio is stored as a metrics -// only is the root span, but is required for computing values in any span. -func (t Trace) ComputeWeight(root Span) { - weight := root.Weight() - - for i := range t { - t[i].weight = weight - } -} diff --git a/model/trace_test.go b/model/trace_test.go index 83719c0c9..423482245 100644 --- a/model/trace_test.go +++ b/model/trace_test.go @@ -46,10 +46,10 @@ func TestTraceChildrenMap(t *testing.T) { childrenMap := trace.ChildrenMap() - assert.Equal(Spans{&trace[1], &trace[2]}, childrenMap[1]) - assert.Equal(Spans{&trace[3]}, childrenMap[2]) - assert.Equal(Spans{&trace[4]}, childrenMap[3]) - assert.Equal(Spans{&trace[5]}, childrenMap[4]) - assert.Equal(Spans{}, childrenMap[5]) - assert.Equal(Spans{}, childrenMap[6]) + assert.Equal([]*Span{&trace[1], &trace[2]}, childrenMap[1]) + assert.Equal([]*Span{&trace[3]}, childrenMap[2]) + assert.Equal([]*Span{&trace[4]}, childrenMap[3]) + assert.Equal([]*Span{&trace[5]}, childrenMap[4]) + assert.Equal([]*Span{}, childrenMap[5]) + assert.Equal([]*Span{}, childrenMap[6]) } diff --git a/model/weighted_span.go b/model/weighted_span.go new file mode 100644 index 000000000..77911020d --- /dev/null +++ b/model/weighted_span.go @@ -0,0 +1,29 @@ +package model + +// WeightedSpan extends Span to contain weights required by the Concentrator. +type WeightedSpan struct { + Weight float64 // Span weight. Similar to the trace root.Weight(). + TopLevel bool // Is this span a service top-level or not. Similar to span.TopLevel(). + + *Span +} + +// WeightedTrace is a slice of WeightedSpan pointers. +type WeightedTrace []*WeightedSpan + +// NewWeightedTrace returns a weighted trace, with coefficient required by the concentrator. +func NewWeightedTrace(trace []Span, root *Span) WeightedTrace { + wt := make(WeightedTrace, len(trace)) + + weight := root.Weight() + + for i := range trace { + wt[i] = &WeightedSpan{ + Span: &trace[i], + Weight: weight, + TopLevel: trace[i].TopLevel(), + } + } + + return wt +}