Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
Compute sublayers for non root toplevel spans (#384)
Browse files Browse the repository at this point in the history
* Compute sublayers for non root toplevel spans

* Switch to a DFS instead of a BFS

* Add types to concentrator tests

* Add logging when detecting cycles

* Address comments
  • Loading branch information
bmermet authored Mar 1, 2018
1 parent 9ca50cb commit b0d80ec
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 19 deletions.
11 changes: 8 additions & 3 deletions cmd/trace-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type processedTrace struct {
WeightedTrace model.WeightedTrace
Root *model.Span
Env string
Sublayers []model.SublayerValue
Sublayers map[*model.Span][]model.SublayerValue
}

func (pt *processedTrace) weight() float64 {
Expand Down Expand Up @@ -227,8 +227,13 @@ func (a *Agent) Process(t model.Trace) {
t.ComputeTopLevel()
wt := model.NewWeightedTrace(t, root)

sublayers := model.ComputeSublayers(t)
model.SetSublayersOnSpan(root, sublayers)
subtraces := t.ExtractTopLevelSubtraces(root)
sublayers := make(map[*model.Span][]model.SublayerValue)
for _, subtrace := range subtraces {
subtraceSublayers := model.ComputeSublayers(subtrace.Trace)
sublayers[subtrace.Root] = subtraceSublayers
model.SetSublayersOnSpan(subtrace.Root, subtraceSublayers)
}

for i := range t {
quantizer.Quantize(t[i])
Expand Down
6 changes: 3 additions & 3 deletions cmd/trace-agent/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ func (c *Concentrator) Add(t processedTrace) {
c.buckets[btime] = b
}

if t.Root != nil && s.SpanID == t.Root.SpanID && t.Sublayers != nil {
// handle sublayers
b.HandleSpan(s, t.Env, c.aggregators, &t.Sublayers)
sublayers, ok := t.Sublayers[s.Span]
if ok {
b.HandleSpan(s, t.Env, c.aggregators, &sublayers)
} else {
b.HandleSpan(s, t.Env, c.aggregators, nil)
}
Expand Down
114 changes: 102 additions & 12 deletions cmd/trace-agent/concentrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ func getTsInBucket(alignedNow int64, bsize int64, offset int64) int64 {

// testSpan avoids typo and inconsistency in test spans (typical pitfall: duration, start time,
// and end time are aligned, and end time is the one that needs to be aligned
func testSpan(c *Concentrator, spanID uint64, duration, offset int64, service, resource string, err int32) *model.Span {
func testSpan(c *Concentrator, spanID uint64, parentID uint64, duration, offset int64, service, resource string, err int32) *model.Span {
now := model.Now()
alignedNow := now - now%c.bsize

return &model.Span{
SpanID: spanID,
ParentID: parentID,
Duration: duration,
Start: getTsInBucket(alignedNow, c.bsize, offset) - duration,
Service: service,
Name: "query",
Resource: resource,
Error: err,
Type: "db",
}
}

Expand All @@ -48,19 +50,19 @@ func TestConcentratorStatsCounts(t *testing.T) {

trace := model.Trace{
// first bucket
testSpan(c, 1, 24, 3, "A1", "resource1", 0),
testSpan(c, 2, 12, 3, "A1", "resource1", 2),
testSpan(c, 3, 40, 3, "A2", "resource2", 2),
testSpan(c, 4, 300000000000, 3, "A2", "resource2", 2), // 5 minutes trace
testSpan(c, 5, 30, 3, "A2", "resourcefoo", 0),
testSpan(c, 1, 0, 24, 3, "A1", "resource1", 0),
testSpan(c, 2, 0, 12, 3, "A1", "resource1", 2),
testSpan(c, 3, 0, 40, 3, "A2", "resource2", 2),
testSpan(c, 4, 0, 300000000000, 3, "A2", "resource2", 2), // 5 minutes trace
testSpan(c, 5, 0, 30, 3, "A2", "resourcefoo", 0),
// second bucket
testSpan(c, 6, 24, 2, "A1", "resource2", 0),
testSpan(c, 7, 12, 2, "A1", "resource1", 2),
testSpan(c, 8, 40, 2, "A2", "resource1", 2),
testSpan(c, 9, 30, 2, "A2", "resource2", 2),
testSpan(c, 10, 3600000000000, 2, "A2", "resourcefoo", 0), // 1 hour trace
testSpan(c, 6, 0, 24, 2, "A1", "resource2", 0),
testSpan(c, 7, 0, 12, 2, "A1", "resource1", 2),
testSpan(c, 8, 0, 40, 2, "A2", "resource1", 2),
testSpan(c, 9, 0, 30, 2, "A2", "resource2", 2),
testSpan(c, 10, 0, 3600000000000, 2, "A2", "resourcefoo", 0), // 1 hour trace
// third bucket - but should not be flushed because it's the second to last
testSpan(c, 6, 24, 1, "A1", "resource2", 0),
testSpan(c, 6, 0, 24, 1, "A1", "resource2", 0),
}
trace.ComputeTopLevel()
wt := model.NewWeightedTrace(trace, trace.GetRoot())
Expand Down Expand Up @@ -145,3 +147,91 @@ func TestConcentratorStatsCounts(t *testing.T) {
assert.Equal(val, int64(count.Value), "Wrong value for count %s", key)
}
}

// This test makes sure that sublayers related stats are properly created
func TestConcentratorSublayersStatsCounts(t *testing.T) {
assert := assert.New(t)
statsChan := make(chan []model.StatsBucket)
c := NewConcentrator([]string{}, testBucketInterval, statsChan)

now := model.Now()
alignedNow := now - now%c.bsize

trace := model.Trace{
// first bucket
testSpan(c, 1, 0, 2000, 3, "A1", "resource1", 0),
testSpan(c, 2, 1, 1000, 3, "A2", "resource2", 0),
testSpan(c, 3, 1, 1000, 3, "A2", "resource3", 0),
testSpan(c, 4, 2, 40, 3, "A3", "resource4", 0),
testSpan(c, 5, 2, 300, 3, "A3", "resource5", 0),
testSpan(c, 6, 2, 30, 3, "A3", "resource6", 0),
}
trace.ComputeTopLevel()
wt := model.NewWeightedTrace(trace, trace.GetRoot())

subtraces := trace.ExtractTopLevelSubtraces(trace.GetRoot())
sublayers := make(map[*model.Span][]model.SublayerValue)
for _, subtrace := range subtraces {
subtraceSublayers := model.ComputeSublayers(subtrace.Trace)
sublayers[subtrace.Root] = subtraceSublayers
}

testTrace := processedTrace{
Env: "none",
Trace: trace,
WeightedTrace: wt,
Sublayers: sublayers,
}

c.Add(testTrace)
stats := c.Flush()

if !assert.Equal(1, len(stats), "We should get exactly 1 StatsBucket") {
t.FailNow()
}

assert.Equal(alignedNow-3*testBucketInterval, stats[0].Start)

var receivedCounts map[string]model.Count

// Start with the first/older bucket
receivedCounts = stats[0].Counts
expectedCountValByKey := map[string]int64{
"query|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A1": 2000,
"query|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A2": 2000,
"query|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A3": 370,
"query|_sublayers.duration.by_service|env:none,resource:resource2,service:A2,sublayer_service:A2": 1000,
"query|_sublayers.duration.by_service|env:none,resource:resource2,service:A2,sublayer_service:A3": 370,
"query|_sublayers.duration.by_type|env:none,resource:resource1,service:A1,sublayer_type:db": 4370,
"query|_sublayers.duration.by_type|env:none,resource:resource2,service:A2,sublayer_type:db": 1370,
"query|_sublayers.span_count|env:none,resource:resource1,service:A1,:": 6,
"query|_sublayers.span_count|env:none,resource:resource2,service:A2,:": 4,
"query|duration|env:none,resource:resource1,service:A1": 2000,
"query|duration|env:none,resource:resource2,service:A2": 1000,
"query|duration|env:none,resource:resource3,service:A2": 1000,
"query|duration|env:none,resource:resource4,service:A3": 40,
"query|duration|env:none,resource:resource5,service:A3": 300,
"query|duration|env:none,resource:resource6,service:A3": 30,
"query|errors|env:none,resource:resource1,service:A1": 0,
"query|errors|env:none,resource:resource2,service:A2": 0,
"query|errors|env:none,resource:resource3,service:A2": 0,
"query|errors|env:none,resource:resource4,service:A3": 0,
"query|errors|env:none,resource:resource5,service:A3": 0,
"query|errors|env:none,resource:resource6,service:A3": 0,
"query|hits|env:none,resource:resource1,service:A1": 1,
"query|hits|env:none,resource:resource2,service:A2": 1,
"query|hits|env:none,resource:resource3,service:A2": 1,
"query|hits|env:none,resource:resource4,service:A3": 1,
"query|hits|env:none,resource:resource5,service:A3": 1,
"query|hits|env:none,resource:resource6,service:A3": 1,
}

// verify we got all counts
assert.Equal(len(expectedCountValByKey), len(receivedCounts), "GOT %v", receivedCounts)
// verify values
for key, val := range expectedCountValByKey {
count, ok := receivedCounts[key]
assert.True(ok, "%s was expected from concentrator", key)
assert.Equal(val, int64(count.Value), "Wrong value for count %s", key)
}
}
2 changes: 1 addition & 1 deletion model/sublayers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (v SublayerValue) GoString() string {
// direct child span at that time interval. This is done by
// iterating over the spans, iterating over each time
// intervals, and checking if the span has a child running
// during that time interval. If now, it is considered active:
// during that time interval. If not, it is considered active:
//
// {
// 0: [ 1 ],
Expand Down
84 changes: 84 additions & 0 deletions model/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,87 @@ func (t Trace) APITrace() *APITrace {
EndTime: end,
}
}

// Subtrace represents the combination of a root span and the trace consisting of all its descendant spans
type Subtrace struct {
Root *Span
Trace Trace
}

// spanAndAncestors is used by ExtractTopLevelSubtraces to store the pair of a span and its ancestors
type spanAndAncestors struct {
Span *Span
Ancestors []*Span
}

// element and queue implement a very basic LIFO used to do an iterative DFS on a trace
type element struct {
SpanAndAncestors *spanAndAncestors
Next *element
}

type stack struct {
head *element
}

func (s *stack) Push(value *spanAndAncestors) {
e := &element{value, nil}
if s.head == nil {
s.head = e
return
}
e.Next = s.head
s.head = e
}

func (s *stack) Pop() *spanAndAncestors {
if s.head == nil {
return nil
}
value := s.head.SpanAndAncestors
s.head = s.head.Next
return value
}

// ExtractTopLevelSubtraces extracts all subtraces rooted in a toplevel span,
// ComputeTopLevel should be called before.
func (t Trace) ExtractTopLevelSubtraces(root *Span) []Subtrace {
if root == nil {
return []Subtrace{}
}
childrenMap := t.ChildrenMap()
subtraces := []Subtrace{}

visited := make(map[*Span]bool, len(t))
subtracesMap := make(map[*Span][]*Span)
var next stack
next.Push(&spanAndAncestors{root, []*Span{}})

// We do a DFS on the trace to record the toplevel ancesters of each span
for current := next.Pop(); current != nil; current = next.Pop() {
// We do not extract subtraces for toplevel spans that have no children
// since these are not interresting
if current.Span.TopLevel() && len(childrenMap[current.Span.SpanID]) > 0 {
current.Ancestors = append(current.Ancestors, current.Span)
}
visited[current.Span] = true
for _, ancestor := range current.Ancestors {
subtracesMap[ancestor] = append(subtracesMap[ancestor], current.Span)
}
for _, child := range childrenMap[current.Span.SpanID] {
// Continue if this span has already been explored (meaning the
// trace is not a Tree)
if visited[child] {
log.Warnf("Found a cycle while processing traceID:%v, trace should be a tree", t[0].TraceID)
continue
}
next.Push(&spanAndAncestors{child, current.Ancestors})
}
}

for topLevel, subtrace := range subtracesMap {
subtraces = append(subtraces, Subtrace{topLevel, subtrace})
}

return subtraces
}
90 changes: 90 additions & 0 deletions model/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,93 @@ func TestTraceChildrenMap(t *testing.T) {
assert.Equal([]*Span{}, childrenMap[5])
assert.Equal([]*Span{}, childrenMap[6])
}

func TestExtractTopLevelSubtracesWithSimpleTrace(t *testing.T) {
assert := assert.New(t)

trace := Trace{
&Span{SpanID: 1, ParentID: 0, Service: "s1"},
&Span{SpanID: 2, ParentID: 1, Service: "s2"},
&Span{SpanID: 3, ParentID: 2, Service: "s2"},
&Span{SpanID: 4, ParentID: 3, Service: "s2"},
&Span{SpanID: 5, ParentID: 1, Service: "s1"},
}

expected := []Subtrace{
Subtrace{trace[0], trace},
Subtrace{trace[1], []*Span{trace[1], trace[2], trace[3]}},
}

trace.ComputeTopLevel()
subtraces := trace.ExtractTopLevelSubtraces(trace[0])

assert.Equal(len(expected), len(subtraces))

subtracesMap := make(map[*Span]Subtrace)
for _, s := range subtraces {
subtracesMap[s.Root] = s
}

for _, s := range expected {
assert.ElementsMatch(s.Trace, subtracesMap[s.Root].Trace)
}
}

func TestExtractTopLevelSubtracesShouldIgnoreLeafTopLevel(t *testing.T) {
assert := assert.New(t)

trace := Trace{
&Span{SpanID: 1, ParentID: 0, Service: "s1"},
&Span{SpanID: 2, ParentID: 1, Service: "s2"},
&Span{SpanID: 3, ParentID: 2, Service: "s2"},
&Span{SpanID: 4, ParentID: 1, Service: "s3"},
}

expected := []Subtrace{
Subtrace{trace[0], trace},
Subtrace{trace[1], []*Span{trace[1], trace[2]}},
}

trace.ComputeTopLevel()
subtraces := trace.ExtractTopLevelSubtraces(trace[0])

assert.Equal(len(expected), len(subtraces))

subtracesMap := make(map[*Span]Subtrace)
for _, s := range subtraces {
subtracesMap[s.Root] = s
}

for _, s := range expected {
assert.ElementsMatch(s.Trace, subtracesMap[s.Root].Trace)
}
}

func TestExtractTopLevelSubtracesWorksInSpiteOfCycles(t *testing.T) {
assert := assert.New(t)

trace := Trace{
&Span{SpanID: 1, ParentID: 3, Service: "s1"},
&Span{SpanID: 2, ParentID: 1, Service: "s2"},
&Span{SpanID: 3, ParentID: 2, Service: "s2"},
}

expected := []Subtrace{
Subtrace{trace[0], trace},
Subtrace{trace[1], []*Span{trace[1], trace[2]}},
}

trace.ComputeTopLevel()
subtraces := trace.ExtractTopLevelSubtraces(trace[0])

assert.Equal(len(expected), len(subtraces))

subtracesMap := make(map[*Span]Subtrace)
for _, s := range subtraces {
subtracesMap[s.Root] = s
}

for _, s := range expected {
assert.ElementsMatch(s.Trace, subtracesMap[s.Root].Trace)
}
}

0 comments on commit b0d80ec

Please sign in to comment.