Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Enforce message event and link limits. #1022

Merged
merged 1 commit into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions trace/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ type SpanData struct {
Annotations []Annotation
MessageEvents []MessageEvent
Status
Links []Link
HasRemoteParent bool
DroppedAttributeCount int
DroppedAnnotationCount int
Links []Link
HasRemoteParent bool
DroppedAttributeCount int
DroppedAnnotationCount int
DroppedMessageEventCount int
DroppedLinkCount int
}
38 changes: 35 additions & 3 deletions trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type Span struct {
// annotations are stored in FIFO queue capped by configured limit.
annotations *evictedQueue

// messageEvents are stored in FIFO queue capped by configured limit.
messageEvents *evictedQueue

// links are stored in FIFO queue capped by configured limit.
links *evictedQueue

// spanStore is the spanStore this span belongs to, if any, otherwise it is nil.
*spanStore
endOnce sync.Once
Expand Down Expand Up @@ -236,6 +242,8 @@ func startSpanInternal(name string, hasParent bool, parent SpanContext, remotePa
}
span.lruAttributes = newLruMap(cfg.MaxAttributesPerSpan)
span.annotations = newEvictedQueue(cfg.MaxAnnotationEventsPerSpan)
span.messageEvents = newEvictedQueue(cfg.MaxMessageEventsPerSpan)
span.links = newEvictedQueue(cfg.MaxLinksPerSpan)

if hasParent {
span.data.ParentSpanID = parent.SpanID
Expand Down Expand Up @@ -295,6 +303,14 @@ func (s *Span) makeSpanData() *SpanData {
sd.Annotations = s.interfaceArrayToAnnotationArray()
sd.DroppedAnnotationCount = s.annotations.droppedCount
}
if len(s.messageEvents.queue) > 0 {
sd.MessageEvents = s.interfaceArrayToMessageEventArray()
sd.DroppedMessageEventCount = s.messageEvents.droppedCount
}
if len(s.links.queue) > 0 {
sd.Links = s.interfaceArrayToLinksArray()
sd.DroppedLinkCount = s.links.droppedCount
}
s.mu.Unlock()
return &sd
}
Expand Down Expand Up @@ -327,6 +343,22 @@ func (s *Span) SetStatus(status Status) {
s.mu.Unlock()
}

func (s *Span) interfaceArrayToLinksArray() []Link {
linksArr := make([]Link, 0)
for _, value := range s.links.queue {
linksArr = append(linksArr, value.(Link))
}
return linksArr
}

func (s *Span) interfaceArrayToMessageEventArray() []MessageEvent {
messageEventArr := make([]MessageEvent, 0)
for _, value := range s.messageEvents.queue {
messageEventArr = append(messageEventArr, value.(MessageEvent))
}
return messageEventArr
}

func (s *Span) interfaceArrayToAnnotationArray() []Annotation {
annotationArr := make([]Annotation, 0)
for _, value := range s.annotations.queue {
Expand Down Expand Up @@ -434,7 +466,7 @@ func (s *Span) AddMessageSendEvent(messageID, uncompressedByteSize, compressedBy
}
now := time.Now()
s.mu.Lock()
s.data.MessageEvents = append(s.data.MessageEvents, MessageEvent{
s.messageEvents.add(MessageEvent{
Time: now,
EventType: MessageEventTypeSent,
MessageID: messageID,
Expand All @@ -456,7 +488,7 @@ func (s *Span) AddMessageReceiveEvent(messageID, uncompressedByteSize, compresse
}
now := time.Now()
s.mu.Lock()
s.data.MessageEvents = append(s.data.MessageEvents, MessageEvent{
s.messageEvents.add(MessageEvent{
Time: now,
EventType: MessageEventTypeRecv,
MessageID: messageID,
Expand All @@ -472,7 +504,7 @@ func (s *Span) AddLink(l Link) {
return
}
s.mu.Lock()
s.data.Links = append(s.data.Links, l)
s.links.add(l)
s.mu.Unlock()
}

Expand Down
82 changes: 82 additions & 0 deletions trace/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,45 @@ func TestMessageEvents(t *testing.T) {
}
}

func TestMessageEventsOverLimit(t *testing.T) {
cfg := Config{MaxMessageEventsPerSpan: 2}
ApplyConfig(cfg)
span := startSpan(StartOptions{})
span.AddMessageReceiveEvent(5, 300, 120)
span.AddMessageSendEvent(4, 100, 50)
span.AddMessageReceiveEvent(3, 400, 300)
span.AddMessageSendEvent(1, 200, 100)
got, err := endSpan(span)
if err != nil {
t.Fatal(err)
}

for i := range got.MessageEvents {
if !checkTime(&got.MessageEvents[i].Time) {
t.Error("exporting span: expected nonzero MessageEvent Time")
}
}

want := &SpanData{
SpanContext: SpanContext{
TraceID: tid,
SpanID: SpanID{},
TraceOptions: 0x1,
},
ParentSpanID: sid,
Name: "span0",
MessageEvents: []MessageEvent{
{EventType: 2, MessageID: 0x3, UncompressedByteSize: 0x190, CompressedByteSize: 0x12c},
{EventType: 1, MessageID: 0x1, UncompressedByteSize: 0xc8, CompressedByteSize: 0x64},
},
DroppedMessageEventCount: 2,
HasRemoteParent: true,
}
if !reflect.DeepEqual(got, want) {
t.Errorf("exporting span: got %#v want %#v", got, want)
}
}

func TestSetSpanName(t *testing.T) {
want := "SpanName-1"
span := startSpan(StartOptions{})
Expand Down Expand Up @@ -633,6 +672,49 @@ func TestAddLink(t *testing.T) {
}
}

func TestAddLinkOverLimit(t *testing.T) {
cfg := Config{MaxLinksPerSpan: 1}
ApplyConfig(cfg)
span := startSpan(StartOptions{})
span.AddLink(Link{
TraceID: tid,
SpanID: sid,
Type: LinkTypeParent,
Attributes: map[string]interface{}{"key4": "value4"},
})
span.AddLink(Link{
TraceID: tid,
SpanID: sid,
Type: LinkTypeParent,
Attributes: map[string]interface{}{"key5": "value5"},
})
got, err := endSpan(span)
if err != nil {
t.Fatal(err)
}

want := &SpanData{
SpanContext: SpanContext{
TraceID: tid,
SpanID: SpanID{},
TraceOptions: 0x1,
},
ParentSpanID: sid,
Name: "span0",
Links: []Link{{
TraceID: tid,
SpanID: sid,
Type: 2,
Attributes: map[string]interface{}{"key5": "value5"},
}},
DroppedLinkCount: 1,
HasRemoteParent: true,
}
if !reflect.DeepEqual(got, want) {
t.Errorf("exporting span: got %#v want %#v", got, want)
}
}

func TestUnregisterExporter(t *testing.T) {
var te testExporter
RegisterExporter(&te)
Expand Down