diff --git a/agent/agent.go b/agent/agent.go index 4306950d3..e03c4cafd 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -183,8 +183,10 @@ func (a *Agent) Process(t model.Trace) { if priority == 0 { priorityPtr = &ts.TracesPriority0 - } else { + } else if priority == 1 { priorityPtr = &ts.TracesPriority1 + } else { + priorityPtr = &ts.TracesPriority2 } } } diff --git a/agent/receiver.go b/agent/receiver.go index 4da95a8f1..a52e5e53c 100644 --- a/agent/receiver.go +++ b/agent/receiver.go @@ -313,7 +313,10 @@ func (r *HTTPReceiver) logStats() { if now.Sub(lastLog) >= time.Minute { // We expose the stats accumulated to expvar updateReceiverStats(accStats) - log.Info(accStats.String()) + + for logStr := range accStats.Strings() { + log.Info(logStr) + } // We reset the stats accumulated during the last minute accStats.reset() diff --git a/agent/stats.go b/agent/stats.go index 31b072cb0..fc2e03f39 100644 --- a/agent/stats.go +++ b/agent/stats.go @@ -51,25 +51,32 @@ func (rs *receiverStats) publish() { func (rs *receiverStats) reset() { rs.Lock() - for _, tagStats := range rs.Stats { + for key, tagStats := range rs.Stats { + // If a tagStats was empty, let's drop it. + // That's a way to avoid empty stats entries or over-time leaks. + if tagStats.isEmpty() { + delete(rs.Stats, key) + } tagStats.reset() } rs.Unlock() } -// String gives a string representation of the receiverStats struct. -func (rs *receiverStats) String() string { +// Strings gives a multi strings representation of the receiverStats struct. +func (rs *receiverStats) Strings() []string { rs.RLock() defer rs.RUnlock() - str := "" if len(rs.Stats) == 0 { - return "no data received" + return []string{"no data received"} } + + strings := make([]string, len(rs.Stats)) + for _, ts := range rs.Stats { - str += fmt.Sprintf("\n\t%v -> %s", ts.Tags.toArray(), ts.String()) + strings = append(strings, fmt.Sprintf("%v -> %s", ts.Tags.toArray(), ts.String())) } - return str + return strings } // tagStats is the struct used to associate the stats with their set of tags. @@ -90,6 +97,7 @@ func (ts *tagStats) publish() { tracesPriorityNone := atomic.LoadInt64(&ts.TracesPriorityNone) tracesPriority0 := atomic.LoadInt64(&ts.TracesPriority0) tracesPriority1 := atomic.LoadInt64(&ts.TracesPriority1) + tracesPriority2 := atomic.LoadInt64(&ts.TracesPriority2) tracesBytes := atomic.LoadInt64(&ts.TracesBytes) spansReceived := atomic.LoadInt64(&ts.SpansReceived) spansDropped := atomic.LoadInt64(&ts.SpansDropped) @@ -107,6 +115,7 @@ func (ts *tagStats) publish() { statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNone, append(tags, "priority:none"), 1) statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority0, append(tags, "priority:0"), 1) statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority1, append(tags, "priority:1"), 1) + statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority2, append(tags, "priority:2"), 1) statsd.Client.Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, tags, 1) statsd.Client.Count("datadog.trace_agent.receiver.spans_received", spansReceived, tags, 1) statsd.Client.Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, tags, 1) @@ -128,8 +137,10 @@ type Stats struct { TracesPriorityNone int64 // TracesPriority0 is the number of traces with sampling priority set to zero. TracesPriority0 int64 - // TracesPriority1 is the number of traces with sampling priority set to a non-zero value. + // TracesPriority1 is the number of traces with sampling priority automatically set to 1. TracesPriority1 int64 + // TracesPriority2 is the number of traces with sampling priority manually set to 2 or more. + TracesPriority2 int64 // TracesBytes is the amount of data received on the traces endpoint (raw data, encoded, compressed). TracesBytes int64 // SpansReceived is the total number of spans received, including the dropped ones. @@ -151,6 +162,7 @@ func (s *Stats) update(recent Stats) { atomic.AddInt64(&s.TracesPriorityNone, recent.TracesPriorityNone) atomic.AddInt64(&s.TracesPriority0, recent.TracesPriority0) atomic.AddInt64(&s.TracesPriority1, recent.TracesPriority1) + atomic.AddInt64(&s.TracesPriority2, recent.TracesPriority2) atomic.AddInt64(&s.TracesBytes, recent.TracesBytes) atomic.AddInt64(&s.SpansReceived, recent.SpansReceived) atomic.AddInt64(&s.SpansDropped, recent.SpansDropped) @@ -166,6 +178,7 @@ func (s *Stats) reset() { atomic.StoreInt64(&s.TracesPriorityNone, 0) atomic.StoreInt64(&s.TracesPriority0, 0) atomic.StoreInt64(&s.TracesPriority1, 0) + atomic.StoreInt64(&s.TracesPriority2, 0) atomic.StoreInt64(&s.TracesBytes, 0) atomic.StoreInt64(&s.SpansReceived, 0) atomic.StoreInt64(&s.SpansDropped, 0) @@ -174,6 +187,12 @@ func (s *Stats) reset() { atomic.StoreInt64(&s.ServicesBytes, 0) } +func (s *Stats) isEmpty() bool { + tracesBytes := atomic.LoadInt64(&s.TracesBytes) + + return tracesBytes == 0 +} + // String returns a string representation of the Stats struct func (s *Stats) String() string { // Atomically load the stats