Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Replace Span with a protobuf model #339

Merged
merged 1 commit into from
Dec 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ EXCLUDE_LINT = [
'model/services_gen.go',
'model/trace_gen.go',
'model/span_gen.go',
'model/span.pb.go',
]

task :default => [:ci]
Expand Down Expand Up @@ -168,3 +169,8 @@ task :ci => [:fmt, :vet, :lint, :test, :build, :windows]
task :err do
sh "errcheck github.com/DataDog/datadog-trace-agent"
end

task :protobuf do
# be sure to have protobuf 3.x and go vendor installed
sh "protoc -I=model -I=vendor --gogofaster_out=model model/*.proto"
end
25 changes: 13 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ const (
)

type processedTrace struct {
Trace model.Trace
Root *model.Span
Env string
Sublayers []model.SublayerValue
Trace model.Trace
WeightedTrace model.WeightedTrace
Root *model.Span
Env string
Sublayers []model.SublayerValue
}

func (pt *processedTrace) weight() float64 {
Expand Down Expand Up @@ -217,7 +218,10 @@ func (a *Agent) Process(t model.Trace) {
rate *= a.Receiver.preSampler.Rate()
sampler.SetTraceAppliedSampleRate(root, rate)

// Need to do this computation before entering the concentrator
// as they access the Metrics map, which is not thread safe.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 much appreciated comment.

t.ComputeTopLevel()
wt := model.NewWeightedTrace(t, root)

sublayers := model.ComputeSublayers(t)
model.SetSublayersOnSpan(root, sublayers)
Expand All @@ -228,19 +232,16 @@ func (a *Agent) Process(t model.Trace) {
}

pt := processedTrace{
Trace: t,
Root: root,
Env: a.conf.DefaultEnv,
Sublayers: sublayers,
Trace: t,
WeightedTrace: wt,
Root: root,
Env: a.conf.DefaultEnv,
Sublayers: sublayers,
}
if tenv := t.GetEnv(); tenv != "" {
pt.Env = tenv
}

// Need to do this computation before entering the concentrator
// as they access the Metrics map, which is not thread safe.
t.ComputeWeight(*root)
t.ComputeTopLevel()
go func() {
defer watchdog.LogOnPanic()
a.Concentrator.Add(pt)
Expand Down
2 changes: 1 addition & 1 deletion agent/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewConcentrator(aggregators []string, bsize int64) *Concentrator {
func (c *Concentrator) Add(t processedTrace) {
c.mu.Lock()

for _, s := range t.Trace {
for _, s := range t.WeightedTrace {
btime := s.End() - s.End()%c.bsize
b, ok := c.buckets[btime]
if !ok {
Expand Down
41 changes: 22 additions & 19 deletions agent/concentrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,30 @@ func TestConcentratorStatsCounts(t *testing.T) {
now := model.Now()
alignedNow := now - now%c.bsize

trace := model.Trace{
// first bucket
testSpan(c, 1, 24, 3, "A1", "resource1", 0),
testSpan(c, 2, 12, 3, "A1", "resource1", 2),
testSpan(c, 3, 40, 3, "A2", "resource2", 2),
testSpan(c, 4, 300000000000, 3, "A2", "resource2", 2), // 5 minutes trace
testSpan(c, 5, 30, 3, "A2", "resourcefoo", 0),
// second bucket
testSpan(c, 6, 24, 2, "A1", "resource2", 0),
testSpan(c, 7, 12, 2, "A1", "resource1", 2),
testSpan(c, 8, 40, 2, "A2", "resource1", 2),
testSpan(c, 9, 30, 2, "A2", "resource2", 2),
testSpan(c, 10, 3600000000000, 2, "A2", "resourcefoo", 0), // 1 hour trace
// third bucket - but should not be flushed because it's the second to last
testSpan(c, 6, 24, 1, "A1", "resource2", 0),
}
trace.ComputeTopLevel()
wt := model.NewWeightedTrace(trace, trace.GetRoot())

testTrace := processedTrace{
Env: "none",
Trace: model.Trace{
// first bucket
testSpan(c, 1, 24, 3, "A1", "resource1", 0),
testSpan(c, 2, 12, 3, "A1", "resource1", 2),
testSpan(c, 3, 40, 3, "A2", "resource2", 2),
testSpan(c, 4, 300000000000, 3, "A2", "resource2", 2), // 5 minutes trace
testSpan(c, 5, 30, 3, "A2", "resourcefoo", 0),
// second bucket
testSpan(c, 6, 24, 2, "A1", "resource2", 0),
testSpan(c, 7, 12, 2, "A1", "resource1", 2),
testSpan(c, 8, 40, 2, "A2", "resource1", 2),
testSpan(c, 9, 30, 2, "A2", "resource2", 2),
testSpan(c, 10, 3600000000000, 2, "A2", "resourcefoo", 0), // 1 hour trace
// third bucket - but should not be flushed because it's the second to last
testSpan(c, 6, 24, 1, "A1", "resource2", 0),
},
Env: "none",
Trace: trace,
WeightedTrace: wt,
}
testTrace.Trace.ComputeWeight(*testTrace.Trace.GetRoot())
testTrace.Trace.ComputeTopLevel()

c.Add(testTrace)
stats := c.Flush()
Expand Down
4 changes: 2 additions & 2 deletions agent/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func BenchmarkHandleSpanRandom(b *testing.B) {
for i := 0; i < b.N; i++ {
trace := fixtures.RandomTrace(10, 8)
root := trace.GetRoot()
trace.ComputeWeight(*root)
trace.ComputeTopLevel()
for _, span := range trace {
wt := model.NewWeightedTrace(trace, root)
for _, span := range wt {
sb.HandleSpan(span, defaultEnv, aggr, nil)
}
}
Expand Down
26 changes: 21 additions & 5 deletions fixtures/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,16 @@ func RandomSpan() model.Span {
}
}

// RandomWeightedSpan generates a random weighted span, useful for stats tests
func RandomWeightedSpan() *model.WeightedSpan {
s := RandomSpan()
return &model.WeightedSpan{
Span: &s,
Weight: 1,
TopLevel: true,
}
}

// GetTestSpan returns a Span with different fields set
func GetTestSpan() model.Span {
span := model.Span{
Expand All @@ -281,7 +291,6 @@ func GetTestSpan() model.Span {
Metrics: map[string]float64{"http.monitor": 41.99},
}
trace := model.Trace{span}
trace.ComputeWeight(model.Span{})
trace.ComputeTopLevel()
return trace[0]
}
Expand All @@ -307,8 +316,15 @@ func TestSpan() model.Span {
ParentID: 1111,
Type: "http",
}
trace := model.Trace{span}
trace.ComputeWeight(model.Span{})
trace.ComputeTopLevel()
return trace[0]
return span
}

// TestWeightedSpan returns a static test weighted span for reproductive stats tests
func TestWeightedSpan() *model.WeightedSpan {
s := TestSpan()
return &model.WeightedSpan{
Span: &s,
Weight: 1,
TopLevel: true,
}
}
11 changes: 6 additions & 5 deletions fixtures/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fixtures

import (
"encoding/json"

"github.com/DataDog/datadog-trace-agent/model"
)

Expand All @@ -12,7 +13,7 @@ const defaultEnv = "none"
// TestStatsBucket returns a fixed stats bucket to be used in unit tests
func TestStatsBucket() model.StatsBucket {
srb := model.NewStatsRawBucket(0, 1e9)
srb.HandleSpan(TestSpan(), defaultEnv, defaultAggregators, nil)
srb.HandleSpan(TestWeightedSpan(), defaultEnv, defaultAggregators, nil)
sb := srb.Export()

// marshalling then unmarshalling data to:
Expand All @@ -33,19 +34,19 @@ func TestStatsBucket() model.StatsBucket {
}

// StatsBucketWithSpans returns a stats bucket populated with spans stats
func StatsBucketWithSpans(s []model.Span) model.StatsBucket {
func StatsBucketWithSpans(spans []*model.WeightedSpan) model.StatsBucket {
srb := model.NewStatsRawBucket(0, 1e9)
for _, s := range s {
for _, s := range spans {
srb.HandleSpan(s, defaultEnv, defaultAggregators, nil)
}
return srb.Export()
}

// RandomStatsBucket returns a bucket made from n random spans, useful to run benchmarks and tests
func RandomStatsBucket(n int) model.StatsBucket {
spans := make([]model.Span, 0, n)
spans := make([]*model.WeightedSpan, 0, n)
for i := 0; i < n; i++ {
spans = append(spans, RandomSpan())
spans = append(spans, RandomWeightedSpan())
}

return StatsBucketWithSpans(spans)
Expand Down
13 changes: 11 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import:
- windows/svc/debug
- windows/svc/eventlog
- windows/svc/mgr
- package: github.com/golang/protobuf
subpackages:
- protoc-gen-go
- proto
- package: github.com/gogo/protobuf
version: v0.5
subpackages:
- gogoproto
testImport:
- package: github.com/stretchr/testify
version: v1.1.4
Expand Down
8 changes: 4 additions & 4 deletions model/normalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestNormalizeTraceTraceIdMismatch(t *testing.T) {
span2 := testSpan()
span2.TraceID = 2

trace := Trace{span1, span2}
trace := Trace{*span1, *span2}

_, err := NormalizeTrace(trace)
assert.Error(t, err)
Expand All @@ -239,7 +239,7 @@ func TestNormalizeTraceInvalidSpan(t *testing.T) {
span2 := testSpan()
span2.Name = "" // invalid

trace := Trace{span1, span2}
trace := Trace{*span1, *span2}

_, err := NormalizeTrace(trace)
assert.Error(t, err)
Expand All @@ -250,7 +250,7 @@ func TestNormalizeTraceDuplicateSpanID(t *testing.T) {
span2 := testSpan()
span2.SpanID = span1.SpanID

trace := Trace{span1, span2}
trace := Trace{*span1, *span2}

_, err := NormalizeTrace(trace)
assert.Error(t, err)
Expand All @@ -262,7 +262,7 @@ func TestNormalizeTrace(t *testing.T) {
span2 := testSpan()
span2.SpanID++

trace := Trace{span1, span2}
trace := Trace{*span1, *span2}

_, err := NormalizeTrace(trace)
assert.NoError(t, err)
Expand Down
Loading