diff --git a/agent/agent.go b/agent/agent.go index 994d98f6c..4306950d3 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -15,8 +15,11 @@ import ( "github.com/DataDog/datadog-trace-agent/watchdog" ) -const processStatsInterval = time.Minute -const languageHeaderKey = "X-Datadog-Reported-Languages" +const ( + processStatsInterval = time.Minute + languageHeaderKey = "X-Datadog-Reported-Languages" + samplingPriorityKey = "_sampling_priority_v1" +) type processedTrace struct { Trace model.Trace @@ -34,14 +37,16 @@ func (pt *processedTrace) weight() float64 { // Agent struct holds all the sub-routines structs and make the data flow between them type Agent struct { - Receiver *HTTPReceiver - Concentrator *Concentrator - Filters []filters.Filter - Sampler *Sampler - Writer *Writer + Receiver *HTTPReceiver + Concentrator *Concentrator + Filters []filters.Filter + ScoreEngine *Sampler + PriorityEngine *Sampler + Writer *Writer // config - conf *config.AgentConfig + conf *config.AgentConfig + dynConf *config.DynamicConfig // Used to synchronize on a clean exit exit chan struct{} @@ -53,26 +58,34 @@ type Agent struct { func NewAgent(conf *config.AgentConfig) *Agent { exit := make(chan struct{}) - r := NewHTTPReceiver(conf) + dynConf := config.NewDynamicConfig() + r := NewHTTPReceiver(conf, dynConf) c := NewConcentrator( conf.ExtraAggregators, conf.BucketInterval.Nanoseconds(), ) f := filters.Setup(conf) - s := NewSampler(conf) + ss := NewScoreEngine(conf) + var ps *Sampler + if conf.PrioritySampling { + // Use priority sampling for distributed tracing only if conf says so + ps = NewPriorityEngine(conf, dynConf) + } w := NewWriter(conf) w.inServices = r.services return &Agent{ - Receiver: r, - Concentrator: c, - Filters: f, - Sampler: s, - Writer: w, - conf: conf, - exit: exit, - die: die, + Receiver: r, + Concentrator: c, + Filters: f, + ScoreEngine: ss, + PriorityEngine: ps, + Writer: w, + conf: conf, + dynConf: dynConf, + exit: exit, + die: die, } } @@ -92,7 +105,10 @@ func (a *Agent) Run() { a.Receiver.Run() a.Writer.Run() - a.Sampler.Run() + a.ScoreEngine.Run() + if a.PriorityEngine != nil { + a.PriorityEngine.Run() + } for { select { @@ -112,7 +128,13 @@ func (a *Agent) Run() { }() go func() { defer watchdog.LogOnPanic() - p.Traces = a.Sampler.Flush() + // Serializing both flushes, classic agent sampler and distributed sampler, + // in most cases only one will be used, so in mainstream case there should + // be no performance issue, only in transitionnal mode can both contain data. + p.Traces = a.ScoreEngine.Flush() + if a.PriorityEngine != nil { + p.Traces = append(p.Traces, a.PriorityEngine.Flush()...) + } wg.Done() }() @@ -126,14 +148,17 @@ func (a *Agent) Run() { log.Info("exiting") close(a.Receiver.exit) a.Writer.Stop() - a.Sampler.Stop() + a.ScoreEngine.Stop() + if a.PriorityEngine != nil { + a.PriorityEngine.Stop() + } return } } } // Process is the default work unit that receives a trace, transforms it and -// passes it downstream +// passes it downstream. func (a *Agent) Process(t model.Trace) { if len(t) == 0 { // XXX Should never happen since we reject empty traces during @@ -143,12 +168,31 @@ func (a *Agent) Process(t model.Trace) { } root := t.GetRoot() + + // We get the address of the struct holding the stats associated to no tags + ts := a.Receiver.stats.getTagStats(Tags{}) + + // We choose the sampler dynamically, depending on trace content, + // it has a sampling priority info (wether 0 or 1 or more) we respect + // this by using priority sampler. Else, use default score sampler. + s := a.ScoreEngine + priorityPtr := &ts.TracesPriorityNone + if a.PriorityEngine != nil { + if priority, ok := root.Metrics[samplingPriorityKey]; ok { + s = a.PriorityEngine + + if priority == 0 { + priorityPtr = &ts.TracesPriority0 + } else { + priorityPtr = &ts.TracesPriority1 + } + } + } + atomic.AddInt64(priorityPtr, 1) + if root.End() < model.Now()-2*a.conf.BucketInterval.Nanoseconds() { log.Errorf("skipping trace with root too far in past, root:%v", *root) - // We get the address of the struct holding the stats associated to the tags - ts := a.Receiver.stats.getTagStats(Tags{}) - atomic.AddInt64(&ts.TracesDropped, 1) atomic.AddInt64(&ts.SpansDropped, int64(len(t))) return @@ -160,7 +204,6 @@ func (a *Agent) Process(t model.Trace) { } log.Debugf("rejecting trace by filter: %T %v", f, *root) - ts := a.Receiver.stats.getTagStats(Tags{}) atomic.AddInt64(&ts.TracesFiltered, 1) atomic.AddInt64(&ts.SpansFiltered, int64(len(t))) @@ -201,7 +244,7 @@ func (a *Agent) Process(t model.Trace) { }() go func() { defer watchdog.LogOnPanic() - a.Sampler.Add(pt) + s.Add(pt) }() } diff --git a/agent/info.go b/agent/info.go index f626cc1d6..e4c8b4565 100644 --- a/agent/info.go +++ b/agent/info.go @@ -19,17 +19,19 @@ import ( ) var ( - infoMu sync.RWMutex - infoReceiverStats []tagStats // only for the last minute - infoEndpointStats endpointStats // only for the last minute - infoWatchdogInfo watchdog.Info - infoSamplerInfo samplerInfo - infoPreSamplerStats sampler.PreSamplerStats - infoStart = time.Now() - infoOnce sync.Once - infoTmpl *template.Template - infoNotRunningTmpl *template.Template - infoErrorTmpl *template.Template + infoMu sync.RWMutex + infoReceiverStats []tagStats // only for the last minute + infoEndpointStats endpointStats // only for the last minute + infoWatchdogInfo watchdog.Info + infoSamplerInfo samplerInfo + infoPrioritySamplerInfo samplerInfo + infoRateByService map[string]float64 + infoPreSamplerStats sampler.PreSamplerStats + infoStart = time.Now() + infoOnce sync.Once + infoTmpl *template.Template + infoNotRunningTmpl *template.Template + infoErrorTmpl *template.Template ) const ( @@ -44,21 +46,23 @@ const ( Hostname: {{.Status.Config.HostName}} Receiver: {{.Status.Config.ReceiverHost}}:{{.Status.Config.ReceiverPort}} API Endpoint: {{.Status.Config.APIEndpoint}}{{ range $i, $ts := .Status.Receiver }} - + --- Receiver stats (1 min) --- - {{if $ts.Tags.Lang}}-> tags: {{ $ts.Tags.Lang }}, {{ $ts.Tags.LangVersion }}, {{ $ts.Tags.Interpreter }}, {{ $ts.Tags.TracerVersion }} - {{else}}-> tags: None{{end}} + -> tags: {{if $ts.Tags.Lang}}{{ $ts.Tags.Lang }}, {{ $ts.Tags.LangVersion }}, {{ $ts.Tags.Interpreter }}, {{ $ts.Tags.TracerVersion }}{{else}}None{{end}} + Traces received: {{ $ts.Stats.TracesReceived }} ({{ $ts.Stats.TracesBytes }} bytes) Spans received: {{ $ts.Stats.SpansReceived }} Services received: {{ $ts.Stats.ServicesReceived }} ({{ $ts.Stats.ServicesBytes }} bytes) - Total data received : {{ add $ts.Stats.TracesBytes $ts.Stats.ServicesBytes }} bytes{{if gt $ts.Stats.TracesDropped 0}} - + Total data received: {{ add $ts.Stats.TracesBytes $ts.Stats.ServicesBytes }} bytes{{if gt $ts.Stats.TracesDropped 0}} + WARNING: Traces dropped: {{ $ts.Stats.TracesDropped }} {{end}}{{if gt $ts.Stats.SpansDropped 0}}WARNING: Spans dropped: {{ $ts.Stats.SpansDropped }}{{end}} - - ------------------------------{{end}}{{if lt .Status.PreSampler.Rate 1.0}} - + + ------------------------------{{end}} +{{ range $key, $value := .Status.RateByService }} + Sample rate for '{{ $key }}': {{percent $value}} %{{ end }}{{if lt .Status.PreSampler.Rate 1.0}} + WARNING: Pre-sampling traces: {{percent .Status.PreSampler.Rate}} % {{end}}{{if .Status.PreSampler.Error}} WARNING: Pre-sampler: {{.Status.PreSampler.Error}} {{end}} @@ -93,75 +97,101 @@ func publishUptime() interface{} { func updateReceiverStats(rs *receiverStats) { infoMu.Lock() - + defer infoMu.Unlock() rs.RLock() + defer rs.RUnlock() + s := make([]tagStats, 0, len(rs.Stats)) for _, tagStats := range rs.Stats { s = append(s, *tagStats) } - rs.RUnlock() infoReceiverStats = s - infoMu.Unlock() } func publishReceiverStats() interface{} { infoMu.RLock() - rs := infoReceiverStats - infoMu.RUnlock() - return rs + defer infoMu.RUnlock() + return infoReceiverStats } func updateEndpointStats(es endpointStats) { infoMu.Lock() + defer infoMu.Unlock() infoEndpointStats = es - infoMu.Unlock() } func publishEndpointStats() interface{} { infoMu.RLock() - es := infoEndpointStats - infoMu.RUnlock() - return es + defer infoMu.RUnlock() + return infoEndpointStats } func updateSamplerInfo(ss samplerInfo) { infoMu.Lock() + defer infoMu.Unlock() + infoSamplerInfo = ss - infoMu.Unlock() } func publishSamplerInfo() interface{} { infoMu.RLock() - ss := infoSamplerInfo - infoMu.RUnlock() - return ss + defer infoMu.RUnlock() + return infoSamplerInfo +} + +func updatePrioritySamplerInfo(ss samplerInfo) { + infoMu.Lock() + defer infoMu.Unlock() + + infoPrioritySamplerInfo = ss +} + +func publishPrioritySamplerInfo() interface{} { + infoMu.RLock() + defer infoMu.RUnlock() + return infoPrioritySamplerInfo +} + +func updateRateByService(rbs map[string]float64) { + infoMu.Lock() + defer infoMu.Unlock() + // remove the default service and env, it can be inferred from other + // values so has little added-value and could be confusing for users. + // Besides, if one still really wants it: + // curl http://localhost:8126/degug/vars would show it. + delete(rbs, "service:,env:") + infoRateByService = rbs +} + +func publishRateByService() interface{} { + infoMu.RLock() + defer infoMu.RUnlock() + return infoRateByService } func updateWatchdogInfo(wi watchdog.Info) { infoMu.Lock() + defer infoMu.Unlock() infoWatchdogInfo = wi - infoMu.Unlock() } func publishWatchdogInfo() interface{} { infoMu.RLock() - wi := infoWatchdogInfo - infoMu.RUnlock() - return wi + defer infoMu.RUnlock() + return infoWatchdogInfo } func updatePreSampler(ss sampler.PreSamplerStats) { infoMu.Lock() + defer infoMu.Unlock() infoPreSamplerStats = ss - infoMu.Unlock() } func publishPreSamplerStats() interface{} { infoMu.RLock() - ss := infoPreSamplerStats - infoMu.RUnlock() - return ss + defer infoMu.RUnlock() + return infoPreSamplerStats } type infoVersion struct { @@ -206,6 +236,8 @@ func initInfo(conf *config.AgentConfig) error { expvar.Publish("receiver", expvar.Func(publishReceiverStats)) expvar.Publish("endpoint", expvar.Func(publishEndpointStats)) expvar.Publish("sampler", expvar.Func(publishSamplerInfo)) + expvar.Publish("prioritysampler", expvar.Func(publishPrioritySamplerInfo)) + expvar.Publish("ratebyservice", expvar.Func(publishRateByService)) expvar.Publish("watchdog", expvar.Func(publishWatchdogInfo)) expvar.Publish("presampler", expvar.Func(publishPreSamplerStats)) @@ -253,12 +285,13 @@ type StatusInfo struct { MemStats struct { Alloc uint64 } `json:"memstats"` - Version infoVersion `json:"version"` - Receiver []tagStats `json:"receiver"` - Endpoint endpointStats `json:"endpoint"` - Watchdog watchdog.Info `json:"watchdog"` - PreSampler sampler.PreSamplerStats `json:"presampler"` - Config config.AgentConfig `json:"config"` + Version infoVersion `json:"version"` + Receiver []tagStats `json:"receiver"` + RateByService map[string]float64 `json:"ratebyservice"` + Endpoint endpointStats `json:"endpoint"` + Watchdog watchdog.Info `json:"watchdog"` + PreSampler sampler.PreSamplerStats `json:"presampler"` + Config config.AgentConfig `json:"config"` } func getProgramBanner(version string) (string, string) { diff --git a/agent/info_test.go b/agent/info_test.go index 6c336ee39..39487fb52 100644 --- a/agent/info_test.go +++ b/agent/info_test.go @@ -33,17 +33,20 @@ Trace Agent (v 0.99.0) Hostname: localhost.localdomain Receiver: localhost:8126 API Endpoint: https://trace.agent.datadoghq.com - + --- Receiver stats (1 min) --- -> tags: None + Traces received: 0 (0 bytes) Spans received: 0 Services received: 0 (0 bytes) - Total data received : 0 bytes - + Total data received: 0 bytes + ------------------------------ + Sample rate for 'service:myapp,env:dev': 12.3 % + Bytes sent (1 min): 3591 Traces sent (1 min): 6 Stats sent (1 min): 60 @@ -61,21 +64,22 @@ Trace Agent (v 0.99.0) Hostname: localhost.localdomain Receiver: localhost:8126 API Endpoint: https://trace.agent.datadoghq.com - + --- Receiver stats (1 min) --- -> tags: python, 2.7.6, CPython, 0.9.0 - + Traces received: 70 (10679 bytes) Spans received: 984 Services received: 0 (0 bytes) - Total data received : 10679 bytes - + Total data received: 10679 bytes + WARNING: Traces dropped: 23 WARNING: Spans dropped: 184 - - ------------------------------ - + + ------------------------------ + + WARNING: Pre-sampling traces: 42.1 % WARNING: Pre-sampler: raising pre-sampling rate from 3.1 % to 5.0 % @@ -100,6 +104,7 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { "endpoint": {"TracesPayload":4,"TracesPayloadError":0,"TracesBytes":3245,"TracesCount":6,"TracesStats":60,"ServicesPayload":2,"ServicesPayloadError":0,"ServicesBytes":346}, "memstats": {"Alloc":773552,"TotalAlloc":773552,"Sys":3346432,"Lookups":6,"Mallocs":7231,"Frees":561,"HeapAlloc":773552,"HeapSys":1572864,"HeapIdle":49152,"HeapInuse":1523712,"HeapReleased":0,"HeapObjects":6670,"StackInuse":524288,"StackSys":524288,"MSpanInuse":24480,"MSpanSys":32768,"MCacheInuse":4800,"MCacheSys":16384,"BuckHashSys":2675,"GCSys":131072,"OtherSys":1066381,"NextGC":4194304,"LastGC":0,"PauseTotalNs":0,"PauseNs":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"PauseEnd":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"NumGC":0,"GCCPUFraction":0,"EnableGC":true,"DebugGC":false,"BySize":[{"Size":0,"Mallocs":0,"Frees":0},{"Size":8,"Mallocs":126,"Frees":0},{"Size":16,"Mallocs":825,"Frees":0},{"Size":32,"Mallocs":4208,"Frees":0},{"Size":48,"Mallocs":345,"Frees":0},{"Size":64,"Mallocs":262,"Frees":0},{"Size":80,"Mallocs":93,"Frees":0},{"Size":96,"Mallocs":70,"Frees":0},{"Size":112,"Mallocs":97,"Frees":0},{"Size":128,"Mallocs":24,"Frees":0},{"Size":144,"Mallocs":25,"Frees":0},{"Size":160,"Mallocs":57,"Frees":0},{"Size":176,"Mallocs":128,"Frees":0},{"Size":192,"Mallocs":13,"Frees":0},{"Size":208,"Mallocs":77,"Frees":0},{"Size":224,"Mallocs":3,"Frees":0},{"Size":240,"Mallocs":2,"Frees":0},{"Size":256,"Mallocs":17,"Frees":0},{"Size":288,"Mallocs":64,"Frees":0},{"Size":320,"Mallocs":12,"Frees":0},{"Size":352,"Mallocs":20,"Frees":0},{"Size":384,"Mallocs":1,"Frees":0},{"Size":416,"Mallocs":59,"Frees":0},{"Size":448,"Mallocs":0,"Frees":0},{"Size":480,"Mallocs":3,"Frees":0},{"Size":512,"Mallocs":2,"Frees":0},{"Size":576,"Mallocs":17,"Frees":0},{"Size":640,"Mallocs":6,"Frees":0},{"Size":704,"Mallocs":10,"Frees":0},{"Size":768,"Mallocs":0,"Frees":0},{"Size":896,"Mallocs":11,"Frees":0},{"Size":1024,"Mallocs":11,"Frees":0},{"Size":1152,"Mallocs":12,"Frees":0},{"Size":1280,"Mallocs":2,"Frees":0},{"Size":1408,"Mallocs":2,"Frees":0},{"Size":1536,"Mallocs":0,"Frees":0},{"Size":1664,"Mallocs":10,"Frees":0},{"Size":2048,"Mallocs":17,"Frees":0},{"Size":2304,"Mallocs":7,"Frees":0},{"Size":2560,"Mallocs":1,"Frees":0},{"Size":2816,"Mallocs":1,"Frees":0},{"Size":3072,"Mallocs":1,"Frees":0},{"Size":3328,"Mallocs":7,"Frees":0},{"Size":4096,"Mallocs":4,"Frees":0},{"Size":4608,"Mallocs":1,"Frees":0},{"Size":5376,"Mallocs":6,"Frees":0},{"Size":6144,"Mallocs":4,"Frees":0},{"Size":6400,"Mallocs":0,"Frees":0},{"Size":6656,"Mallocs":1,"Frees":0},{"Size":6912,"Mallocs":0,"Frees":0},{"Size":8192,"Mallocs":0,"Frees":0},{"Size":8448,"Mallocs":0,"Frees":0},{"Size":8704,"Mallocs":1,"Frees":0},{"Size":9472,"Mallocs":0,"Frees":0},{"Size":10496,"Mallocs":0,"Frees":0},{"Size":12288,"Mallocs":1,"Frees":0},{"Size":13568,"Mallocs":0,"Frees":0},{"Size":14080,"Mallocs":0,"Frees":0},{"Size":16384,"Mallocs":0,"Frees":0},{"Size":16640,"Mallocs":0,"Frees":0},{"Size":17664,"Mallocs":1,"Frees":0}]}, "pid": 38149, +"ratebyservice": {"service:myapp,env:dev":0.123}, "receiver": [{}], "presampler": {"Rate":1.0}, "uptime": 15, diff --git a/agent/receiver.go b/agent/receiver.go index 629bfd03f..4da95a8f1 100644 --- a/agent/receiver.go +++ b/agent/receiver.go @@ -43,6 +43,10 @@ const ( // Traces: msgpack/JSON (Content-Type) slice of traces // Services: msgpack/JSON, map[string]map[string][string] v03 APIVersion = "v0.3" + // v04 + // Traces: msgpack/JSON (Content-Type) slice of traces + returns service sampling ratios + // Services: msgpack/JSON, map[string]map[string][string] + v04 APIVersion = "v0.4" ) // HTTPReceiver is a collector that uses HTTP protocol and just holds @@ -51,6 +55,7 @@ type HTTPReceiver struct { traces chan model.Trace services chan model.ServicesMetadata conf *config.AgentConfig + dynConf *config.DynamicConfig stats *receiverStats preSampler *sampler.PreSampler @@ -62,12 +67,13 @@ type HTTPReceiver struct { } // NewHTTPReceiver returns a pointer to a new HTTPReceiver -func NewHTTPReceiver(conf *config.AgentConfig) *HTTPReceiver { +func NewHTTPReceiver(conf *config.AgentConfig, dynConf *config.DynamicConfig) *HTTPReceiver { // use buffered channels so that handlers are not waiting on downstream processing return &HTTPReceiver{ traces: make(chan model.Trace, 5000), // about 1000 traces/sec for 5 sec services: make(chan model.ServicesMetadata, 50), conf: conf, + dynConf: dynConf, stats: newReceiverStats(), preSampler: sampler.NewPreSampler(conf.PreSampleRate), exit: make(chan struct{}), @@ -86,11 +92,13 @@ func (r *HTTPReceiver) Run() { http.HandleFunc("/v0.1/services", r.httpHandleWithVersion(v01, r.handleServices)) http.HandleFunc("/v0.2/traces", r.httpHandleWithVersion(v02, r.handleTraces)) http.HandleFunc("/v0.2/services", r.httpHandleWithVersion(v02, r.handleServices)) - - // current collector API http.HandleFunc("/v0.3/traces", r.httpHandleWithVersion(v03, r.handleTraces)) http.HandleFunc("/v0.3/services", r.httpHandleWithVersion(v03, r.handleServices)) + // current collector API + http.HandleFunc("/v0.4/traces", r.httpHandleWithVersion(v04, r.handleTraces)) + http.HandleFunc("/v0.4/services", r.httpHandleWithVersion(v04, r.handleServices)) + // expvar implicitely publishes "/debug/vars" on the same port addr := fmt.Sprintf("%s:%d", r.conf.ReceiverHost, r.conf.ReceiverPort) @@ -168,6 +176,21 @@ func (r *HTTPReceiver) httpHandleWithVersion(v APIVersion, f func(APIVersion, ht }) } +func (r *HTTPReceiver) replyTraces(v APIVersion, w http.ResponseWriter) { + switch v { + case v01: + fallthrough + case v02: + fallthrough + case v03: + // Simple response, simply acknowledge with "OK" + HTTPOK(w) + case v04: + // Return the recommended sampling rate for each service as a JSON. + HTTPRateByService(w, r.dynConf) + } +} + // handleTraces knows how to handle a bunch of traces func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *http.Request) { if !r.preSampler.Sample(req) { @@ -180,7 +203,8 @@ func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *ht return } - HTTPOK(w) // We successfuly decoded the payload + // We successfuly decoded the payload + r.replyTraces(v, w) // We parse the tags from the header tags := Tags{ @@ -220,11 +244,11 @@ func (r *HTTPReceiver) handleTraces(v APIVersion, w http.ResponseWriter, req *ht } else { atomic.AddInt64(&ts.SpansDropped, int64(spans-len(normTrace))) - // if our downstream consumer is slow, we drop the trace on the floor - // this is a safety net against us using too much memory - // when clients flood us select { case r.traces <- normTrace: + // if our downstream consumer is slow, we drop the trace on the floor + // this is a safety net against us using too much memory + // when clients flood us default: atomic.AddInt64(&ts.TracesDropped, 1) atomic.AddInt64(&ts.SpansDropped, int64(spans)) @@ -294,6 +318,10 @@ func (r *HTTPReceiver) logStats() { // We reset the stats accumulated during the last minute accStats.reset() lastLog = now + + // Also publish rates by service (they are updated by receiver) + rates := r.dynConf.RateByService.GetAll() + updateRateByService(rates) } } } @@ -344,6 +372,8 @@ func getTraces(v APIVersion, w http.ResponseWriter, req *http.Request) (model.Tr case v02: fallthrough case v03: + fallthrough + case v04: if err := decodeReceiverPayload(req.Body, &traces, v, contentType); err != nil { log.Errorf("cannot decode %s traces payload: %v", v, err) HTTPDecodingError(err, []string{tagTraceHandler, fmt.Sprintf("v:%s", v)}, w) diff --git a/agent/receiver_responses.go b/agent/receiver_responses.go index 5dcc7c115..5a8a9f0ee 100644 --- a/agent/receiver_responses.go +++ b/agent/receiver_responses.go @@ -1,18 +1,31 @@ package main import ( + "encoding/json" "fmt" "io" "net/http" + "github.com/DataDog/datadog-trace-agent/config" "github.com/DataDog/datadog-trace-agent/model" "github.com/DataDog/datadog-trace-agent/statsd" ) +const ( + receiverErrorKey = "datadog.trace_agent.receiver.error" +) + +// We encaspulate the answers in a container, this is to ease-up transition, +// should we add another fied. +type traceResponse struct { + // All the sampling rates recommended, by service + Rates map[string]float64 `json:"rate_by_service"` +} + // HTTPFormatError is used for payload format errors func HTTPFormatError(tags []string, w http.ResponseWriter) { tags = append(tags, "error:format-error") - statsd.Client.Count("datadog.trace_agent.receiver.error", 1, tags, 1) + statsd.Client.Count(receiverErrorKey, 1, tags, 1) http.Error(w, "format-error", http.StatusUnsupportedMediaType) } @@ -29,7 +42,7 @@ func HTTPDecodingError(err error, tags []string, w http.ResponseWriter) { } tags = append(tags, fmt.Sprintf("error:%s", errtag)) - statsd.Client.Count("datadog.trace_agent.receiver.error", 1, tags, 1) + statsd.Client.Count(receiverErrorKey, 1, tags, 1) http.Error(w, msg, status) } @@ -37,7 +50,7 @@ func HTTPDecodingError(err error, tags []string, w http.ResponseWriter) { // HTTPEndpointNotSupported is for payloads getting sent to a wrong endpoint func HTTPEndpointNotSupported(tags []string, w http.ResponseWriter) { tags = append(tags, "error:unsupported-endpoint") - statsd.Client.Count("datadog.trace_agent.receiver.error", 1, tags, 1) + statsd.Client.Count(receiverErrorKey, 1, tags, 1) http.Error(w, "unsupported-endpoint", http.StatusInternalServerError) } @@ -46,3 +59,17 @@ func HTTPOK(w http.ResponseWriter) { w.WriteHeader(http.StatusOK) io.WriteString(w, "OK\n") } + +// HTTPRateByService outputs, as a JSON, the recommended sampling rates for all services. +func HTTPRateByService(w http.ResponseWriter, dynConf *config.DynamicConfig) { + w.WriteHeader(http.StatusOK) + response := traceResponse{ + Rates: dynConf.RateByService.GetAll(), // this is thread-safe + } + encoder := json.NewEncoder(w) + if err := encoder.Encode(response); err != nil { + tags := []string{"error:response-error"} + statsd.Client.Count(receiverErrorKey, 1, tags, 1) + return + } +} diff --git a/agent/receiver_test.go b/agent/receiver_test.go index cfc6e421b..736888b0f 100644 --- a/agent/receiver_test.go +++ b/agent/receiver_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io/ioutil" "net/http" "net/http/httptest" "testing" @@ -32,12 +33,13 @@ func TestReceiverRequestBodyLength(t *testing.T) { conf := config.NewDefaultAgentConfig() conf.APIKey = "test" + dynConf := config.NewDynamicConfig() // save the global mux aside, we don't want to break other tests defaultMux := http.DefaultServeMux http.DefaultServeMux = http.NewServeMux() - receiver := NewHTTPReceiver(conf) + receiver := NewHTTPReceiver(conf, dynConf) receiver.maxRequestBodyLength = 2 go receiver.Run() @@ -49,7 +51,7 @@ func TestReceiverRequestBodyLength(t *testing.T) { http.DefaultServeMux = defaultMux }() - url := fmt.Sprintf("http://%s:%d/v0.3/traces", + url := fmt.Sprintf("http://%s:%d/v0.4/traces", conf.ReceiverHost, conf.ReceiverPort) // Before going further, make sure receiver is started @@ -87,7 +89,8 @@ func TestReceiverRequestBodyLength(t *testing.T) { func TestLegacyReceiver(t *testing.T) { // testing traces without content-type in agent endpoints, it should use JSON decoding assert := assert.New(t) - config := config.NewDefaultAgentConfig() + conf := config.NewDefaultAgentConfig() + dynConf := config.NewDynamicConfig() testCases := []struct { name string r *HTTPReceiver @@ -95,8 +98,8 @@ func TestLegacyReceiver(t *testing.T) { contentType string traces model.Trace }{ - {"v01 with empty content-type", NewHTTPReceiver(config), v01, "", model.Trace{fixtures.GetTestSpan()}}, - {"v01 with application/json", NewHTTPReceiver(config), v01, "application/json", model.Trace{fixtures.GetTestSpan()}}, + {"v01 with empty content-type", NewHTTPReceiver(conf, dynConf), v01, "", model.Trace{fixtures.GetTestSpan()}}, + {"v01 with application/json", NewHTTPReceiver(conf, dynConf), v01, "application/json", model.Trace{fixtures.GetTestSpan()}}, } for _, tc := range testCases { @@ -143,7 +146,8 @@ func TestLegacyReceiver(t *testing.T) { func TestReceiverJSONDecoder(t *testing.T) { // testing traces without content-type in agent endpoints, it should use JSON decoding assert := assert.New(t) - config := config.NewDefaultAgentConfig() + conf := config.NewDefaultAgentConfig() + dynConf := config.NewDynamicConfig() testCases := []struct { name string r *HTTPReceiver @@ -151,12 +155,15 @@ func TestReceiverJSONDecoder(t *testing.T) { contentType string traces []model.Trace }{ - {"v02 with empty content-type", NewHTTPReceiver(config), v02, "", fixtures.GetTestTrace(1, 1)}, - {"v03 with empty content-type", NewHTTPReceiver(config), v03, "", fixtures.GetTestTrace(1, 1)}, - {"v02 with application/json", NewHTTPReceiver(config), v02, "application/json", fixtures.GetTestTrace(1, 1)}, - {"v03 with application/json", NewHTTPReceiver(config), v03, "application/json", fixtures.GetTestTrace(1, 1)}, - {"v02 with text/json", NewHTTPReceiver(config), v02, "text/json", fixtures.GetTestTrace(1, 1)}, - {"v03 with text/json", NewHTTPReceiver(config), v03, "text/json", fixtures.GetTestTrace(1, 1)}, + {"v02 with empty content-type", NewHTTPReceiver(conf, dynConf), v02, "", fixtures.GetTestTrace(1, 1)}, + {"v03 with empty content-type", NewHTTPReceiver(conf, dynConf), v03, "", fixtures.GetTestTrace(1, 1)}, + {"v04 with empty content-type", NewHTTPReceiver(conf, dynConf), v04, "", fixtures.GetTestTrace(1, 1)}, + {"v02 with application/json", NewHTTPReceiver(conf, dynConf), v02, "application/json", fixtures.GetTestTrace(1, 1)}, + {"v03 with application/json", NewHTTPReceiver(conf, dynConf), v03, "application/json", fixtures.GetTestTrace(1, 1)}, + {"v04 with application/json", NewHTTPReceiver(conf, dynConf), v04, "application/json", fixtures.GetTestTrace(1, 1)}, + {"v02 with text/json", NewHTTPReceiver(conf, dynConf), v02, "text/json", fixtures.GetTestTrace(1, 1)}, + {"v03 with text/json", NewHTTPReceiver(conf, dynConf), v03, "text/json", fixtures.GetTestTrace(1, 1)}, + {"v04 with text/json", NewHTTPReceiver(conf, dynConf), v04, "text/json", fixtures.GetTestTrace(1, 1)}, } for _, tc := range testCases { @@ -204,7 +211,8 @@ func TestReceiverMsgpackDecoder(t *testing.T) { // testing traces without content-type in agent endpoints, it should use Msgpack decoding // or it should raise a 415 Unsupported media type assert := assert.New(t) - config := config.NewDefaultAgentConfig() + conf := config.NewDefaultAgentConfig() + dynConf := config.NewDynamicConfig() testCases := []struct { name string r *HTTPReceiver @@ -212,9 +220,10 @@ func TestReceiverMsgpackDecoder(t *testing.T) { contentType string traces model.Traces }{ - {"v01 with application/msgpack", NewHTTPReceiver(config), v01, "application/msgpack", fixtures.GetTestTrace(1, 1)}, - {"v02 with application/msgpack", NewHTTPReceiver(config), v02, "application/msgpack", fixtures.GetTestTrace(1, 1)}, - {"v03 with application/msgpack", NewHTTPReceiver(config), v03, "application/msgpack", fixtures.GetTestTrace(1, 1)}, + {"v01 with application/msgpack", NewHTTPReceiver(conf, dynConf), v01, "application/msgpack", fixtures.GetTestTrace(1, 1)}, + {"v02 with application/msgpack", NewHTTPReceiver(conf, dynConf), v02, "application/msgpack", fixtures.GetTestTrace(1, 1)}, + {"v03 with application/msgpack", NewHTTPReceiver(conf, dynConf), v03, "application/msgpack", fixtures.GetTestTrace(1, 1)}, + {"v04 with application/msgpack", NewHTTPReceiver(conf, dynConf), v04, "application/msgpack", fixtures.GetTestTrace(1, 1)}, } for _, tc := range testCases { @@ -259,6 +268,34 @@ func TestReceiverMsgpackDecoder(t *testing.T) { default: t.Fatalf("no data received") } + + body, err := ioutil.ReadAll(resp.Body) + assert.Nil(err) + assert.Equal("OK\n", string(body)) + case v04: + assert.Equal(200, resp.StatusCode) + + // now we should be able to read the trace data + select { + case rt := <-tc.r.traces: + assert.Len(rt, 1) + span := rt[0] + assert.Equal(uint64(42), span.TraceID) + assert.Equal(uint64(52), span.SpanID) + assert.Equal("fennel_is_amazing", span.Service) + assert.Equal("something_that_should_be_a_metric", span.Name) + assert.Equal("NOT touched because it is going to be hashed", span.Resource) + assert.Equal("192.168.0.1", span.Meta["http.host"]) + assert.Equal(41.99, span.Metrics["http.monitor"]) + default: + t.Fatalf("no data received") + } + + body, err := ioutil.ReadAll(resp.Body) + assert.Nil(err) + var tr traceResponse + err = json.Unmarshal(body, &tr) + assert.Nil(err, "the answer should be a valid JSON") } resp.Body.Close() @@ -270,22 +307,26 @@ func TestReceiverMsgpackDecoder(t *testing.T) { func TestReceiverServiceJSONDecoder(t *testing.T) { // testing traces without content-type in agent endpoints, it should use JSON decoding assert := assert.New(t) - config := config.NewDefaultAgentConfig() + conf := config.NewDefaultAgentConfig() + dynConf := config.NewDynamicConfig() testCases := []struct { name string r *HTTPReceiver apiVersion APIVersion contentType string }{ - {"v01 with empty content-type", NewHTTPReceiver(config), v01, ""}, - {"v02 with empty content-type", NewHTTPReceiver(config), v02, ""}, - {"v03 with empty content-type", NewHTTPReceiver(config), v03, ""}, - {"v01 with application/json", NewHTTPReceiver(config), v01, "application/json"}, - {"v02 with application/json", NewHTTPReceiver(config), v02, "application/json"}, - {"v03 with application/json", NewHTTPReceiver(config), v03, "application/json"}, - {"v01 with text/json", NewHTTPReceiver(config), v01, "text/json"}, - {"v02 with text/json", NewHTTPReceiver(config), v02, "text/json"}, - {"v03 with text/json", NewHTTPReceiver(config), v03, "text/json"}, + {"v01 with empty content-type", NewHTTPReceiver(conf, dynConf), v01, ""}, + {"v02 with empty content-type", NewHTTPReceiver(conf, dynConf), v02, ""}, + {"v03 with empty content-type", NewHTTPReceiver(conf, dynConf), v03, ""}, + {"v04 with empty content-type", NewHTTPReceiver(conf, dynConf), v04, ""}, + {"v01 with application/json", NewHTTPReceiver(conf, dynConf), v01, "application/json"}, + {"v02 with application/json", NewHTTPReceiver(conf, dynConf), v02, "application/json"}, + {"v03 with application/json", NewHTTPReceiver(conf, dynConf), v03, "application/json"}, + {"v04 with application/json", NewHTTPReceiver(conf, dynConf), v04, "application/json"}, + {"v01 with text/json", NewHTTPReceiver(conf, dynConf), v01, "text/json"}, + {"v02 with text/json", NewHTTPReceiver(conf, dynConf), v02, "text/json"}, + {"v03 with text/json", NewHTTPReceiver(conf, dynConf), v03, "text/json"}, + {"v04 with text/json", NewHTTPReceiver(conf, dynConf), v04, "text/json"}, } for _, tc := range testCases { @@ -341,16 +382,18 @@ func TestReceiverServiceMsgpackDecoder(t *testing.T) { // testing traces without content-type in agent endpoints, it should use Msgpack decoding // or it should raise a 415 Unsupported media type assert := assert.New(t) - config := config.NewDefaultAgentConfig() + conf := config.NewDefaultAgentConfig() + dynConf := config.NewDynamicConfig() testCases := []struct { name string r *HTTPReceiver apiVersion APIVersion contentType string }{ - {"v01 with application/msgpack", NewHTTPReceiver(config), v01, "application/msgpack"}, - {"v02 with application/msgpack", NewHTTPReceiver(config), v02, "application/msgpack"}, - {"v03 with application/msgpack", NewHTTPReceiver(config), v03, "application/msgpack"}, + {"v01 with application/msgpack", NewHTTPReceiver(conf, dynConf), v01, "application/msgpack"}, + {"v02 with application/msgpack", NewHTTPReceiver(conf, dynConf), v02, "application/msgpack"}, + {"v03 with application/msgpack", NewHTTPReceiver(conf, dynConf), v03, "application/msgpack"}, + {"v04 with application/msgpack", NewHTTPReceiver(conf, dynConf), v04, "application/msgpack"}, } for _, tc := range testCases { @@ -403,6 +446,28 @@ func TestReceiverServiceMsgpackDecoder(t *testing.T) { default: t.Fatalf("no data received") } + + body, err := ioutil.ReadAll(resp.Body) + assert.Nil(err) + assert.Equal("OK\n", string(body)) + case v04: + assert.Equal(200, resp.StatusCode) + + // now we should be able to read the trace data + select { + case rt := <-tc.r.services: + assert.Len(rt, 2) + assert.Equal(rt["backend"]["app"], "django") + assert.Equal(rt["backend"]["app_type"], "web") + assert.Equal(rt["database"]["app"], "postgres") + assert.Equal(rt["database"]["app_type"], "db") + default: + t.Fatalf("no data received") + } + + body, err := ioutil.ReadAll(resp.Body) + assert.Nil(err) + assert.Equal("OK\n", string(body)) } resp.Body.Close() @@ -419,12 +484,13 @@ func TestHandleTraces(t *testing.T) { msgp.Encode(&buf, fixtures.GetTestTrace(10, 10)) // prepare the receiver - config := config.NewDefaultAgentConfig() - config.APIKey = "test" - receiver := NewHTTPReceiver(config) + conf := config.NewDefaultAgentConfig() + conf.APIKey = "test" + dynConf := config.NewDynamicConfig() + receiver := NewHTTPReceiver(conf, dynConf) // response recorder - handler := http.HandlerFunc(receiver.httpHandleWithVersion(v03, receiver.handleTraces)) + handler := http.HandlerFunc(receiver.httpHandleWithVersion(v04, receiver.handleTraces)) for n := 0; n < 10; n++ { // consume the traces channel without doing anything @@ -435,7 +501,7 @@ func TestHandleTraces(t *testing.T) { // forge the request rr := httptest.NewRecorder() - req, _ := http.NewRequest("POST", "/v0.3/traces", bytes.NewReader(buf.Bytes())) + req, _ := http.NewRequest("POST", "/v0.4/traces", bytes.NewReader(buf.Bytes())) req.Header.Set("Content-Type", "application/msgpack") // Add meta data to simulate data comming from multiple applications @@ -475,12 +541,13 @@ func BenchmarkHandleTracesFromOneApp(b *testing.B) { msgp.Encode(&buf, fixtures.GetTestTrace(1, 1)) // prepare the receiver - config := config.NewDefaultAgentConfig() - config.APIKey = "test" - receiver := NewHTTPReceiver(config) + conf := config.NewDefaultAgentConfig() + dynConf := config.NewDynamicConfig() + conf.APIKey = "test" + receiver := NewHTTPReceiver(conf, dynConf) // response recorder - handler := http.HandlerFunc(receiver.httpHandleWithVersion(v03, receiver.handleTraces)) + handler := http.HandlerFunc(receiver.httpHandleWithVersion(v04, receiver.handleTraces)) // benchmark b.ResetTimer() @@ -495,7 +562,7 @@ func BenchmarkHandleTracesFromOneApp(b *testing.B) { // forge the request rr := httptest.NewRecorder() - req, _ := http.NewRequest("POST", "/v0.3/traces", bytes.NewReader(buf.Bytes())) + req, _ := http.NewRequest("POST", "/v0.4/traces", bytes.NewReader(buf.Bytes())) req.Header.Set("Content-Type", "application/msgpack") // Add meta data to simulate data comming from multiple applications @@ -516,12 +583,13 @@ func BenchmarkHandleTracesFromMultipleApps(b *testing.B) { msgp.Encode(&buf, fixtures.GetTestTrace(1, 1)) // prepare the receiver - config := config.NewDefaultAgentConfig() - config.APIKey = "test" - receiver := NewHTTPReceiver(config) + conf := config.NewDefaultAgentConfig() + conf.APIKey = "test" + dynConf := config.NewDynamicConfig() + receiver := NewHTTPReceiver(conf, dynConf) // response recorder - handler := http.HandlerFunc(receiver.httpHandleWithVersion(v03, receiver.handleTraces)) + handler := http.HandlerFunc(receiver.httpHandleWithVersion(v04, receiver.handleTraces)) // benchmark b.ResetTimer() @@ -536,7 +604,7 @@ func BenchmarkHandleTracesFromMultipleApps(b *testing.B) { // forge the request rr := httptest.NewRecorder() - req, _ := http.NewRequest("POST", "/v0.3/traces", bytes.NewReader(buf.Bytes())) + req, _ := http.NewRequest("POST", "/v0.4/traces", bytes.NewReader(buf.Bytes())) req.Header.Set("Content-Type", "application/msgpack") // Add meta data to simulate data comming from multiple applications diff --git a/agent/sampler.go b/agent/sampler.go index 32a91c0bc..0f6b25ccd 100644 --- a/agent/sampler.go +++ b/agent/sampler.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "reflect" "sync" "time" @@ -19,7 +21,7 @@ type Sampler struct { traceCount int lastFlush time.Time - samplerEngine SamplerEngine + engine sampler.Engine } // samplerStats contains sampler statistics @@ -31,25 +33,29 @@ type samplerStats struct { } type samplerInfo struct { + // EngineType contains the type of the engine (tells old sampler and new distributed sampler apart) + EngineType string // Stats contains statistics about what the sampler is doing. Stats samplerStats // State is the internal state of the sampler (for debugging mostly) State sampler.InternalState } -// SamplerEngine cares about telling if a trace is a proper sample or not -type SamplerEngine interface { - Run() - Stop() - Sample(t model.Trace, root *model.Span, env string) bool +// NewScoreEngine creates a new empty sampler ready to be started +func NewScoreEngine(conf *config.AgentConfig) *Sampler { + return &Sampler{ + sampledTraces: []model.Trace{}, + traceCount: 0, + engine: sampler.NewScoreEngine(conf.ExtraSampleRate, conf.MaxTPS), + } } -// NewSampler creates a new empty sampler ready to be started -func NewSampler(conf *config.AgentConfig) *Sampler { +// NewPriorityEngine creates a new empty distributed sampler ready to be started +func NewPriorityEngine(conf *config.AgentConfig, dynConf *config.DynamicConfig) *Sampler { return &Sampler{ sampledTraces: []model.Trace{}, traceCount: 0, - samplerEngine: sampler.NewSampler(conf.ExtraSampleRate, conf.MaxTPS), + engine: sampler.NewPriorityEngine(conf.ExtraSampleRate, conf.MaxTPS, &dynConf.RateByService), } } @@ -57,7 +63,7 @@ func NewSampler(conf *config.AgentConfig) *Sampler { func (s *Sampler) Run() { go func() { defer watchdog.LogOnPanic() - s.samplerEngine.Run() + s.engine.Run() }() } @@ -65,7 +71,7 @@ func (s *Sampler) Run() { func (s *Sampler) Add(t processedTrace) { s.mu.Lock() s.traceCount++ - if s.samplerEngine.Sample(t.Trace, t.Root, t.Env) { + if s.engine.Sample(t.Trace, t.Root, t.Env) { s.sampledTraces = append(s.sampledTraces, t.Trace) } s.mu.Unlock() @@ -73,7 +79,7 @@ func (s *Sampler) Add(t processedTrace) { // Stop stops the sampler func (s *Sampler) Stop() { - s.samplerEngine.Stop() + s.engine.Stop() } // Flush returns representative spans based on GetSamples and reset its internal memory @@ -91,19 +97,30 @@ func (s *Sampler) Flush() []model.Trace { s.mu.Unlock() - state := s.samplerEngine.(*sampler.Sampler).GetState() - var stats samplerStats - if duration > 0 { - stats.KeptTPS = float64(len(traces)) / duration.Seconds() - stats.TotalTPS = float64(traceCount) / duration.Seconds() + state := s.engine.GetState() + + switch state := state.(type) { + case sampler.InternalState: + var stats samplerStats + if duration > 0 { + stats.KeptTPS = float64(len(traces)) / duration.Seconds() + stats.TotalTPS = float64(traceCount) / duration.Seconds() + } + + log.Debugf("flushed %d sampled traces out of %d", len(traces), traceCount) + log.Debugf("inTPS: %f, outTPS: %f, maxTPS: %f, offset: %f, slope: %f, cardinality: %d", + state.InTPS, state.OutTPS, state.MaxTPS, state.Offset, state.Slope, state.Cardinality) + + // publish through expvar + switch s.engine.(type) { + case *sampler.ScoreEngine: + updateSamplerInfo(samplerInfo{EngineType: fmt.Sprint(reflect.TypeOf(s.engine)), Stats: stats, State: state}) + case *sampler.PriorityEngine: + updatePrioritySamplerInfo(samplerInfo{EngineType: fmt.Sprint(reflect.TypeOf(s.engine)), Stats: stats, State: state}) + } + default: + log.Debugf("unhandled sampler engine, can't log state") } - log.Debugf("flushed %d sampled traces out of %d", len(traces), traceCount) - log.Debugf("inTPS: %f, outTPS: %f, maxTPS: %f, offset: %f, slope: %f, cardinality: %d", - state.InTPS, state.OutTPS, state.MaxTPS, state.Offset, state.Slope, state.Cardinality) - - // publish through expvar - updateSamplerInfo(samplerInfo{Stats: stats, State: state}) - return traces } diff --git a/agent/stats.go b/agent/stats.go index 0bd4ef797..6e34c1875 100644 --- a/agent/stats.go +++ b/agent/stats.go @@ -87,6 +87,9 @@ func (ts *tagStats) publish() { tracesReceived := atomic.LoadInt64(&ts.TracesReceived) tracesDropped := atomic.LoadInt64(&ts.TracesDropped) tracesFiltered := atomic.LoadInt64(&ts.TracesFiltered) + tracesPriorityNone := atomic.LoadInt64(&ts.TracesPriorityNone) + tracesPriority0 := atomic.LoadInt64(&ts.TracesPriority0) + tracesPriority1 := atomic.LoadInt64(&ts.TracesPriority1) tracesBytes := atomic.LoadInt64(&ts.TracesBytes) spansReceived := atomic.LoadInt64(&ts.SpansReceived) spansDropped := atomic.LoadInt64(&ts.SpansDropped) @@ -95,16 +98,21 @@ func (ts *tagStats) publish() { servicesBytes := atomic.LoadInt64(&ts.ServicesBytes) // Publish the stats - statsd.Client.Count("datadog.trace_agent.receiver.trace", tracesReceived, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_received", tracesReceived, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_dropped", tracesDropped, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_filtered", tracesFiltered, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.spans_received", spansReceived, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.spans_filtered", spansFiltered, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.services_received", servicesReceived, ts.Tags.toArray(), 1) - statsd.Client.Count("datadog.trace_agent.receiver.services_bytes", servicesBytes, ts.Tags.toArray(), 1) + tags := ts.Tags.toArray() + + statsd.Client.Count("datadog.trace_agent.receiver.trace", tracesReceived, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.traces_received", tracesReceived, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.traces_dropped", tracesDropped, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.traces_filtered", tracesFiltered, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriorityNone, append(tags, "priority:none"), 1) + statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority0, append(tags, "priority:0"), 1) + statsd.Client.Count("datadog.trace_agent.receiver.traces_priority", tracesPriority1, append(tags, "priority:1"), 1) + statsd.Client.Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.spans_received", spansReceived, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.spans_filtered", spansFiltered, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.services_received", servicesReceived, tags, 1) + statsd.Client.Count("datadog.trace_agent.receiver.services_bytes", servicesBytes, tags, 1) } // Stats holds the metrics that will be reported every 10s by the agent. @@ -114,8 +122,14 @@ type Stats struct { TracesReceived int64 // TracesDropped is the number of traces dropped. TracesDropped int64 - // TracesDropped is the number of traces filtered. + // TracesFiltered is the number of traces filtered. TracesFiltered int64 + // TracesPriorityNone is the number of traces with no sampling priority. + TracesPriorityNone int64 + // TracesPriority0 is the number of traces with sampling priority set to zero. + TracesPriority0 int64 + // TracesPriority1 is the number of traces with sampling priority set to a non-zero value. + TracesPriority1 int64 // TracesBytes is the amount of data received on the traces endpoint (raw data, encoded, compressed). TracesBytes int64 // SpansReceived is the total number of spans received, including the dropped ones. @@ -134,6 +148,9 @@ func (s *Stats) update(recent Stats) { atomic.AddInt64(&s.TracesReceived, recent.TracesReceived) atomic.AddInt64(&s.TracesDropped, recent.TracesDropped) atomic.AddInt64(&s.TracesFiltered, recent.TracesFiltered) + atomic.AddInt64(&s.TracesPriorityNone, recent.TracesPriorityNone) + atomic.AddInt64(&s.TracesPriority0, recent.TracesPriority0) + atomic.AddInt64(&s.TracesPriority1, recent.TracesPriority1) atomic.AddInt64(&s.TracesBytes, recent.TracesBytes) atomic.AddInt64(&s.SpansReceived, recent.SpansReceived) atomic.AddInt64(&s.SpansDropped, recent.SpansDropped) @@ -146,6 +163,9 @@ func (s *Stats) reset() { atomic.StoreInt64(&s.TracesReceived, 0) atomic.StoreInt64(&s.TracesDropped, 0) atomic.StoreInt64(&s.TracesFiltered, 0) + atomic.StoreInt64(&s.TracesPriorityNone, 0) + atomic.StoreInt64(&s.TracesPriority0, 0) + atomic.StoreInt64(&s.TracesPriority1, 0) atomic.StoreInt64(&s.TracesBytes, 0) atomic.StoreInt64(&s.SpansReceived, 0) atomic.StoreInt64(&s.SpansDropped, 0) @@ -156,17 +176,19 @@ func (s *Stats) reset() { // String returns a string representation of the Stats struct func (s *Stats) String() string { - // Atomically load the stas + // Atomically load the stats tracesReceived := atomic.LoadInt64(&s.TracesReceived) tracesDropped := atomic.LoadInt64(&s.TracesDropped) tracesFiltered := atomic.LoadInt64(&s.TracesFiltered) + // Omitting priority information, use expvar or metrics for debugging purpose tracesBytes := atomic.LoadInt64(&s.TracesBytes) servicesReceived := atomic.LoadInt64(&s.ServicesReceived) servicesBytes := atomic.LoadInt64(&s.ServicesBytes) - return fmt.Sprintf("traces received: %v, traces dropped: %v, traces filtered: %v, "+ - "traces amount: %v bytes, services received: %v, services amount: %v bytes", - tracesReceived, tracesDropped, tracesFiltered, tracesBytes, servicesReceived, servicesBytes) + return fmt.Sprintf("traces received: %d, traces dropped: %d, traces filtered: %d, "+ + "traces amount: %d bytes, services received: %d, services amount: %d bytes", + tracesReceived, tracesDropped, tracesFiltered, + tracesBytes, servicesReceived, servicesBytes) } // Tags holds the tags we parse when we handle the header of the payload. diff --git a/config/agent.go b/config/agent.go index 9ec192abc..2c82d4e30 100644 --- a/config/agent.go +++ b/config/agent.go @@ -38,9 +38,11 @@ type AgentConfig struct { ExtraAggregators []string // Sampler configuration - ExtraSampleRate float64 - PreSampleRate float64 - MaxTPS float64 + ExtraSampleRate float64 + PreSampleRate float64 + MaxTPS float64 + PrioritySampling bool + PrioritySamplerTimeout time.Duration // Receiver ReceiverHost string @@ -170,9 +172,11 @@ func NewDefaultAgentConfig() *AgentConfig { BucketInterval: time.Duration(10) * time.Second, ExtraAggregators: []string{"http.status_code"}, - ExtraSampleRate: 1.0, - PreSampleRate: 1.0, - MaxTPS: 10, + ExtraSampleRate: 1.0, + PreSampleRate: 1.0, + MaxTPS: 10, + PrioritySampling: true, + PrioritySamplerTimeout: time.Hour, ReceiverHost: "localhost", ReceiverPort: 8126, @@ -322,6 +326,12 @@ APM_CONF: if v, e := conf.GetFloat("trace.sampler", "max_traces_per_second"); e == nil { c.MaxTPS = v } + if v := strings.ToLower(conf.GetDefault("trace.sampler", "priority_sampling", "")); v == "yes" || v == "true" { + c.PrioritySampling = true + } + if v, e := conf.GetInt("trace.sampler", "priority_sampler_timeout"); e == nil { + c.PrioritySamplerTimeout = time.Duration(v) * time.Second + } if v, e := conf.GetInt("trace.receiver", "receiver_port"); e == nil { c.ReceiverPort = v diff --git a/config/dynamicconfig.go b/config/dynamicconfig.go new file mode 100644 index 000000000..4912b5515 --- /dev/null +++ b/config/dynamicconfig.go @@ -0,0 +1,16 @@ +package config + +// DynamicConfig contains configuration items which may change +// dynamically over time. +type DynamicConfig struct { + // RateByService contains the rate for each service/env tuple, + // used in priority sampling by client libs. + RateByService RateByService +} + +// NewDynamicConfig creates a new dynamic config object. +func NewDynamicConfig() *DynamicConfig { + // Not much logic here now, as RateByService is fine with + // being used unintialized, but external packages should use this. + return &DynamicConfig{} +} diff --git a/config/dynamicconfig_test.go b/config/dynamicconfig_test.go new file mode 100644 index 000000000..88f12ee6e --- /dev/null +++ b/config/dynamicconfig_test.go @@ -0,0 +1,22 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewDynamicConfig(t *testing.T) { + assert := assert.New(t) + + dc := NewDynamicConfig() + assert.NotNil(dc) + + rates := map[string]float64{"service:myservice,env:myenv": 0.5} + + // Not doing a complete test of the different components of dynamic config, + // but still assessing it can do the bare minimum once initialized. + dc.RateByService.SetAll(rates) + rbs := dc.RateByService.GetAll() + assert.Equal(rates, rbs) +} diff --git a/config/ratebyservice.go b/config/ratebyservice.go new file mode 100644 index 000000000..135e98e58 --- /dev/null +++ b/config/ratebyservice.go @@ -0,0 +1,50 @@ +package config + +import ( + "sync" +) + +// RateByService stores the sampling rate per service. It is thread-safe, so +// one can read/write on it concurrently, using getters and setters. +type RateByService struct { + rates map[string]float64 + mutex sync.RWMutex +} + +// SetAll the sampling rate for all services. If a service/env is not +// in the map, then the entry is removed. +func (rbs *RateByService) SetAll(rates map[string]float64) { + rbs.mutex.Lock() + defer rbs.mutex.Unlock() + + if rbs.rates == nil { + rbs.rates = make(map[string]float64, len(rates)) + } + for k, v := range rates { + if v < 0 { + v = 0 + } + if v > 1 { + v = 1 + } + rbs.rates[k] = v + } + for k := range rbs.rates { + if _, ok := rates[k]; !ok { + delete(rbs.rates, k) + } + } +} + +// GetAll returns all sampling rates for all services. +func (rbs *RateByService) GetAll() map[string]float64 { + rbs.mutex.RLock() + defer rbs.mutex.RUnlock() + + ret := make(map[string]float64, len(rbs.rates)) + for k, v := range rbs.rates { + ret[k] = v + } + + return ret +} diff --git a/config/ratebyservice_test.go b/config/ratebyservice_test.go new file mode 100644 index 000000000..05b0fd31f --- /dev/null +++ b/config/ratebyservice_test.go @@ -0,0 +1,60 @@ +package config + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRateByServiceGetSet(t *testing.T) { + assert := assert.New(t) + + var rbc RateByService + testCases := []map[string]float64{ + {"service:,env:": 0.1}, + {"service:,env:": 0.5, "service:mcnulty,env:dev": 0.7, "service:postgres,env:dev": 0.2}, + {}, + {"service:,env:": 0.2}, + } + + for _, tc := range testCases { + rbc.SetAll(tc) + assert.Equal(tc, rbc.GetAll()) + } +} + +func TestRateByServiceLimits(t *testing.T) { + assert := assert.New(t) + + var rbc RateByService + rbc.SetAll(map[string]float64{"service:high,env:": 2, "service:low,env:": -1}) + assert.Equal(map[string]float64{"service:high,env:": 1, "service:low,env:": 0}, rbc.GetAll()) +} + +func TestRateByServiceConcurrency(t *testing.T) { + assert := assert.New(t) + + var rbc RateByService + + const n = 1000 + var wg sync.WaitGroup + wg.Add(2) + + rbc.SetAll(map[string]float64{"service:mcnulty,env:test": 1}) + go func() { + for i := 0; i < n; i++ { + rate := float64(i) / float64(n) + rbc.SetAll(map[string]float64{"service:mcnulty,env:test": rate}) + } + wg.Done() + }() + go func() { + for i := 0; i < n; i++ { + rates := rbc.GetAll() + _, ok := rates["service:mcnulty,env:test"] + assert.True(ok, "key should be here") + } + wg.Done() + }() +} diff --git a/sampler/backend.go b/sampler/backend.go index 2dfb34eba..d36ab2935 100644 --- a/sampler/backend.go +++ b/sampler/backend.go @@ -17,7 +17,7 @@ type Backend struct { totalScore float64 // Score of sampled traces sampledScore float64 - mu sync.Mutex + mu sync.RWMutex // Every decayPeriod, decay the score // Lower value is more reactive, but forgets quicker @@ -88,27 +88,40 @@ func (b *Backend) CountSample() { // GetSignatureScore returns the score of a signature. // It is normalized to represent a number of signatures per second. func (b *Backend) GetSignatureScore(signature Signature) float64 { - b.mu.Lock() + b.mu.RLock() score := b.scores[signature] / b.countScaleFactor - b.mu.Unlock() + b.mu.RUnlock() return score } +// GetAllSignatureScores returns the scores for all signatures. +// It is normalized to represent a number of signatures per second. +func (b *Backend) GetAllSignatureScores() map[Signature]float64 { + b.mu.RLock() + scores := make(map[Signature]float64, len(b.scores)) + for signature, score := range b.scores { + scores[signature] = score / b.countScaleFactor + } + b.mu.RUnlock() + + return scores +} + // GetSampledScore returns the global score of all sampled traces. func (b *Backend) GetSampledScore() float64 { - b.mu.Lock() + b.mu.RLock() score := b.sampledScore / b.countScaleFactor - b.mu.Unlock() + b.mu.RUnlock() return score } // GetTotalScore returns the global score of all sampled traces. func (b *Backend) GetTotalScore() float64 { - b.mu.Lock() + b.mu.RLock() score := b.totalScore / b.countScaleFactor - b.mu.Unlock() + b.mu.RUnlock() return score } diff --git a/sampler/catalog.go b/sampler/catalog.go new file mode 100644 index 000000000..5ba04a00d --- /dev/null +++ b/sampler/catalog.go @@ -0,0 +1,36 @@ +package sampler + +import ( + "github.com/DataDog/datadog-trace-agent/model" +) + +const defaultServiceRateKey = "service:,env:" + +type serviceKeyCatalog map[string]Signature + +func byServiceKey(service, env string) string { + return "service:" + service + ",env:" + env +} + +func newServiceKeyCatalog() serviceKeyCatalog { + return serviceKeyCatalog(make(map[string]Signature)) +} + +func (cat serviceKeyCatalog) register(root *model.Span, env string, sig Signature) { + map[string]Signature(cat)[byServiceKey(root.Service, env)] = sig +} + +func (cat serviceKeyCatalog) getRateByService(rates map[Signature]float64, totalScore float64) map[string]float64 { + rbs := make(map[string]float64, len(rates)+1) + for key, sig := range map[string]Signature(cat) { + if rate, ok := rates[sig]; ok { + rbs[key] = rate + } else { + // Backend, with its decay mecanism, should automatically remove the entries + // which have such a low value that they don't matter any more. + delete(cat, key) + } + } + rbs[defaultServiceRateKey] = totalScore + return rbs +} diff --git a/sampler/catalog_test.go b/sampler/catalog_test.go new file mode 100644 index 000000000..aab3761e2 --- /dev/null +++ b/sampler/catalog_test.go @@ -0,0 +1,86 @@ +package sampler + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestByServiceKey(t *testing.T) { + assert := assert.New(t) + + assert.Equal(defaultServiceRateKey, byServiceKey("", "")) + assert.Equal("service:mcnulty,env:test", byServiceKey("mcnulty", "test")) +} + +func TestNewServiceKeyCatalog(t *testing.T) { + assert := assert.New(t) + + cat := newServiceKeyCatalog() + assert.NotNil(cat) + assert.Equal(map[string]Signature{}, map[string]Signature(cat)) +} + +func TestServiceKeyCatalogRegister(t *testing.T) { + assert := assert.New(t) + + cat := newServiceKeyCatalog() + s := getTestPriorityEngine() + + _, root1 := getTestTraceWithService(t, "service1", s) + sig1 := computeServiceSignature(root1, defaultEnv) + cat.register(root1, defaultEnv, sig1) + assert.Equal(map[string]Signature{"service:service1,env:none": sig1}, map[string]Signature(cat)) + + _, root2 := getTestTraceWithService(t, "service2", s) + sig2 := computeServiceSignature(root2, defaultEnv) + cat.register(root2, defaultEnv, sig2) + assert.Equal(map[string]Signature{ + "service:service1,env:none": sig1, + "service:service2,env:none": sig2, + }, map[string]Signature(cat)) +} + +func TestServiceKeyCatalogGetRateByService(t *testing.T) { + assert := assert.New(t) + + cat := newServiceKeyCatalog() + s := getTestPriorityEngine() + + _, root1 := getTestTraceWithService(t, "service1", s) + sig1 := computeServiceSignature(root1, defaultEnv) + cat.register(root1, defaultEnv, sig1) + _, root2 := getTestTraceWithService(t, "service2", s) + sig2 := computeServiceSignature(root2, defaultEnv) + cat.register(root2, defaultEnv, sig2) + + rates := map[Signature]float64{ + sig1: 0.3, + sig2: 0.7, + } + const totalRate = 0.2 + + var rateByService map[string]float64 + + rateByService = cat.getRateByService(rates, totalRate) + assert.Equal(map[string]float64{ + "service:service1,env:none": 0.3, + "service:service2,env:none": 0.7, + "service:,env:": 0.2, + }, rateByService) + + delete(rates, sig1) + + rateByService = cat.getRateByService(rates, totalRate) + assert.Equal(map[string]float64{ + "service:service2,env:none": 0.7, + "service:,env:": 0.2, + }, rateByService) + + delete(rates, sig2) + + rateByService = cat.getRateByService(rates, totalRate) + assert.Equal(map[string]float64{ + "service:,env:": 0.2, + }, rateByService) +} diff --git a/sampler/sampler.go b/sampler/coresampler.go similarity index 74% rename from sampler/sampler.go rename to sampler/coresampler.go index 11ecb12e2..a784a906a 100644 --- a/sampler/sampler.go +++ b/sampler/coresampler.go @@ -30,6 +30,18 @@ const ( defaultSignatureScoreSlope float64 = 3 ) +// Engine is a common basic interface for sampler engines. +type Engine interface { + // Run the sampler. + Run() + // Stop the sampler. + Stop() + // Sample a trace. + Sample(trace model.Trace, root *model.Span, env string) bool + // GetState returns information about the sampler. + GetState() interface{} +} + // Sampler is the main component of the sampling logic type Sampler struct { // Storage of the state of the sampler @@ -51,12 +63,10 @@ type Sampler struct { exit chan struct{} } -// NewSampler returns an initialized Sampler -func NewSampler(extraRate float64, maxTPS float64) *Sampler { - decayPeriod := defaultDecayPeriod - +// newSampler returns an initialized Sampler +func newSampler(extraRate float64, maxTPS float64) *Sampler { s := &Sampler{ - Backend: NewBackend(decayPeriod), + Backend: NewBackend(defaultDecayPeriod), extraRate: extraRate, maxTPS: maxTPS, @@ -115,38 +125,6 @@ func (s *Sampler) RunAdjustScoring() { } } -// Sample counts an incoming trace and tells if it is a sample which has to be kept -func (s *Sampler) Sample(trace model.Trace, root *model.Span, env string) bool { - // Extra safety, just in case one trace is empty - if len(trace) == 0 { - return false - } - - signature := ComputeSignatureWithRootAndEnv(trace, root, env) - - // Update sampler state by counting this trace - s.Backend.CountSignature(signature) - - sampleRate := s.GetSampleRate(trace, root, signature) - - sampled := ApplySampleRate(root, sampleRate) - - if sampled { - // Count the trace to allow us to check for the maxTPS limit. - // It has to happen before the maxTPS sampling. - s.Backend.CountSample() - - // Check for the maxTPS limit, and if we require an extra sampling. - // No need to check if we already decided not to keep the trace. - maxTPSrate := s.GetMaxTPSSampleRate() - if maxTPSrate < 1 { - sampled = ApplySampleRate(root, maxTPSrate) - } - } - - return sampled -} - // GetSampleRate returns the sample rate to apply to a trace. func (s *Sampler) GetSampleRate(trace model.Trace, root *model.Span, signature Signature) float64 { sampleRate := s.GetSignatureSampleRate(signature) * s.extraRate @@ -168,18 +146,6 @@ func (s *Sampler) GetMaxTPSSampleRate() float64 { return maxTPSrate } -// ApplySampleRate applies a sample rate over a trace root, returning if the trace should be sampled or not. -// It takes into account any previous sampling. -func ApplySampleRate(root *model.Span, sampleRate float64) bool { - initialRate := GetTraceAppliedSampleRate(root) - newRate := initialRate * sampleRate - SetTraceAppliedSampleRate(root, newRate) - - traceID := root.TraceID - - return SampleByRate(traceID, newRate) -} - // GetTraceAppliedSampleRate gets the sample rate the sample rate applied earlier in the pipeline. func GetTraceAppliedSampleRate(root *model.Span) float64 { if rate, ok := root.Metrics[model.SpanSampleRateMetricKey]; ok { diff --git a/sampler/coresampler_test.go b/sampler/coresampler_test.go new file mode 100644 index 000000000..32a039e78 --- /dev/null +++ b/sampler/coresampler_test.go @@ -0,0 +1,41 @@ +package sampler + +import ( + "testing" + "time" + + log "github.com/cihub/seelog" + + "github.com/stretchr/testify/assert" +) + +func getTestSampler() *Sampler { + // Disable debug logs in these tests + log.UseLogger(log.Disabled) + + // No extra fixed sampling, no maximum TPS + extraRate := 1.0 + maxTPS := 0.0 + + return newSampler(extraRate, maxTPS) +} + +func TestSamplerLoop(t *testing.T) { + s := getTestSampler() + + exit := make(chan bool) + + go func() { + s.Run() + close(exit) + }() + + s.Stop() + + select { + case <-exit: + return + case <-time.After(time.Second * 1): + assert.Fail(t, "Sampler took more than 1 second to close") + } +} diff --git a/sampler/prioritysampler.go b/sampler/prioritysampler.go new file mode 100644 index 000000000..10e2403ac --- /dev/null +++ b/sampler/prioritysampler.go @@ -0,0 +1,128 @@ +// Package sampler contains all the logic of the agent-side trace sampling +// +// Currently implementation is based on the scoring of the "signature" of each trace +// Based on the score, we get a sample rate to apply to the given trace +// +// Current score implementation is super-simple, it is a counter with polynomial decay per signature. +// We increment it for each incoming trace then we periodically divide the score by two every X seconds. +// Right after the division, the score is an approximation of the number of received signatures over X seconds. +// It is different from the scoring in the Agent. +// +// Since the sampling can happen at different levels (client, agent, server) or depending on different rules, +// we have to track the sample rate applied at previous steps. This way, sampling twice at 50% can result in an +// effective 25% sampling. The rate is stored as a metric in the trace root. +package sampler + +import ( + "sync" + "time" + + "github.com/DataDog/datadog-trace-agent/config" + "github.com/DataDog/datadog-trace-agent/model" +) + +const ( + samplingPriorityKey = "_sampling_priority_v1" + syncPeriod = 3 * time.Second +) + +// PriorityEngine is the main component of the sampling logic +type PriorityEngine struct { + // Sampler is the underlying sampler used by this engine, sharing logic among various engines. + Sampler *Sampler + + rateByService *config.RateByService + catalog serviceKeyCatalog + catalogMu sync.Mutex + + exit chan struct{} +} + +// NewPriorityEngine returns an initialized Sampler +func NewPriorityEngine(extraRate float64, maxTPS float64, rateByService *config.RateByService) *PriorityEngine { + s := &PriorityEngine{ + Sampler: newSampler(extraRate, maxTPS), + rateByService: rateByService, + catalog: newServiceKeyCatalog(), + exit: make(chan struct{}), + } + + return s +} + +// Run runs and block on the Sampler main loop +func (s *PriorityEngine) Run() { + var wg sync.WaitGroup + wg.Add(2) + + go func() { + s.Sampler.Run() + wg.Done() + }() + + go func() { + t := time.NewTicker(syncPeriod) + defer t.Stop() + + for { + select { + case <-t.C: + s.rateByService.SetAll(s.getRateByService()) + case <-s.exit: + wg.Done() + return + } + } + }() + + wg.Wait() +} + +// Stop stops the main Run loop +func (s *PriorityEngine) Stop() { + s.Sampler.Stop() + close(s.exit) +} + +// Sample counts an incoming trace and tells if it is a sample which has to be kept +func (s *PriorityEngine) Sample(trace model.Trace, root *model.Span, env string) bool { + // Extra safety, just in case one trace is empty + if len(trace) == 0 { + return false + } + + // Regardless of rates, sampling here is based on the metadata set + // by the client library. Which, is turn, is based on agent hints, + // but the rule of thumb is: respect client choice. + sampled := root.Metrics[samplingPriorityKey] > 0 + + signature := computeServiceSignature(root, env) + s.catalogMu.Lock() + s.catalog.register(root, env, signature) + s.catalogMu.Unlock() + + // Update sampler state by counting this trace + s.Sampler.Backend.CountSignature(signature) + + if sampled { + // Count the trace to allow us to check for the maxTPS limit. + // It has to happen before the maxTPS sampling. + s.Sampler.Backend.CountSample() + } + + return sampled +} + +// GetState collects and return internal statistics and coefficients for indication purposes +// It returns an interface{}, as other samplers might return other informations. +func (s *PriorityEngine) GetState() interface{} { + return s.Sampler.GetState() +} + +// getRateByService returns all rates by service, this information is useful for +// agents to pick the right service rate. +func (s *PriorityEngine) getRateByService() map[string]float64 { + defer s.catalogMu.Unlock() + s.catalogMu.Lock() + return s.catalog.getRateByService(s.Sampler.GetAllSignatureSampleRates(), s.Sampler.GetDefaultSampleRate()) +} diff --git a/sampler/prioritysampler_test.go b/sampler/prioritysampler_test.go new file mode 100644 index 000000000..ebff4555c --- /dev/null +++ b/sampler/prioritysampler_test.go @@ -0,0 +1,137 @@ +package sampler + +import ( + "math" + "math/rand" + "testing" + + log "github.com/cihub/seelog" + + "github.com/DataDog/datadog-trace-agent/config" + "github.com/DataDog/datadog-trace-agent/model" + "github.com/stretchr/testify/assert" +) + +const ( + testServiceA = "service-a" + testServiceB = "service-b" +) + +func getTestPriorityEngine() *PriorityEngine { + // Disable debug logs in these tests + log.UseLogger(log.Disabled) + + // No extra fixed sampling, no maximum TPS + extraRate := 1.0 + maxTPS := 0.0 + + rateByService := config.RateByService{} + return NewPriorityEngine(extraRate, maxTPS, &rateByService) +} + +func getTestTraceWithService(t *testing.T, service string, s *PriorityEngine) (model.Trace, *model.Span) { + tID := randomTraceID() + trace := model.Trace{ + model.Span{TraceID: tID, SpanID: 1, ParentID: 0, Start: 42, Duration: 1000000, Service: service, Type: "web", Meta: map[string]string{"env": defaultEnv}}, + model.Span{TraceID: tID, SpanID: 2, ParentID: 1, Start: 100, Duration: 200000, Service: service, Type: "sql"}, + } + r := rand.Float64() + priority := 0.0 + rates := s.getRateByService() + key := byServiceKey(trace[0].Service, defaultEnv) + var rate float64 + if r, ok := rates[key]; ok { + rate = r + } else { + rate = 1 + } + if r <= rate { + priority = 1 + } + trace[0].Metrics = map[string]float64{samplingPriorityKey: priority} + return trace, &trace[0] +} + +func TestMaxTPSByService(t *testing.T) { + // Test the "effectiveness" of the maxTPS option. + assert := assert.New(t) + s := getTestPriorityEngine() + + type testCase struct { + maxTPS float64 + tps float64 + relativeError float64 + } + testCases := []testCase{ + {maxTPS: 10.0, tps: 20.0, relativeError: 0.2}, + } + if !testing.Short() { + testCases = append(testCases, + testCase{maxTPS: 5.0, tps: 50.0, relativeError: 0.2}, + testCase{maxTPS: 3.0, tps: 200.0, relativeError: 0.2}, + testCase{maxTPS: 1.0, tps: 1000.0, relativeError: 0.2}, + testCase{maxTPS: 10.0, tps: 10.0, relativeError: 0.001}, + testCase{maxTPS: 10.0, tps: 3.0, relativeError: 0.001}) + } + + // To avoid the edge effects from an non-initialized sampler, wait a bit before counting samples. + const ( + initPeriods = 50 + periods = 500 + ) + + for _, tc := range testCases { + t.Logf("testing maxTPS=%0.1f tps=%0.1f", tc.maxTPS, tc.tps) + s.Sampler.maxTPS = tc.maxTPS + periodSeconds := s.Sampler.Backend.decayPeriod.Seconds() + tracesPerPeriod := tc.tps * periodSeconds + // Set signature score offset high enough not to kick in during the test. + s.Sampler.signatureScoreOffset = 2 * tc.tps + s.Sampler.signatureScoreFactor = math.Pow(s.Sampler.signatureScoreSlope, math.Log10(s.Sampler.signatureScoreOffset)) + + sampledCount := 0 + handledCount := 0 + + for period := 0; period < initPeriods+periods; period++ { + s.Sampler.Backend.DecayScore() + s.Sampler.AdjustScoring() + for i := 0; i < int(tracesPerPeriod); i++ { + trace, root := getTestTraceWithService(t, "service-a", s) + sampled := s.Sample(trace, root, defaultEnv) + // Once we got into the "supposed-to-be" stable "regime", count the samples + if period > initPeriods { + handledCount++ + if sampled { + sampledCount++ + } + } + } + } + + // When tps is lower than maxTPS it means that we are actually not sampling + // anything, so the target is the original tps, and not maxTPS. + // Also, in that case, results should be more precise. + targetTPS := tc.maxTPS + relativeError := 0.01 + if tc.maxTPS > tc.tps { + targetTPS = tc.tps + } else { + relativeError = 0.1 + s.Sampler.Backend.decayFactor - 1 + } + + // Check that the sampled score is roughly equal to maxTPS. This is different from + // the score sampler test as here we run adjustscoring on a regular basis so the converges to maxTPS. + assert.InEpsilon(targetTPS, s.Sampler.Backend.GetSampledScore(), relativeError) + + // We should have keep the right percentage of traces + assert.InEpsilon(targetTPS/tc.tps, float64(sampledCount)/float64(handledCount), relativeError) + + // We should have a throughput of sampled traces around maxTPS + // Check for 1% epsilon, but the precision also depends on the backend imprecision (error factor = decayFactor). + // Combine error rates with L1-norm instead of L2-norm by laziness, still good enough for tests. + assert.InEpsilon(targetTPS, float64(sampledCount)/(float64(periods)*periodSeconds), relativeError) + } +} + +// Ensure PriorityEngine implements engine. +var testPriorityEngine Engine = &PriorityEngine{} diff --git a/sampler/score.go b/sampler/score.go index 973d8019b..c35fbfe35 100644 --- a/sampler/score.go +++ b/sampler/score.go @@ -21,23 +21,59 @@ func SampleByRate(traceID uint64, sampleRate float64) bool { return true } -// GetSignatureSampleRate gives the sample rate to apply to any signature -// For now, only based on count score +func capTo1(f float64) float64 { + if f > 1 { + return 1 + } + return f +} + +// GetSignatureSampleRate gives the sample rate to apply to any signature. +// For now, only based on count score. func (s *Sampler) GetSignatureSampleRate(signature Signature) float64 { - score := s.GetCountScore(signature) + return capTo1(s.GetCountScore(signature)) +} - if score > 1 { - score = 1.0 +// GetAllSignatureSampleRates gives the sample rate to apply to all signatures. +// For now, only based on count score. +func (s *Sampler) GetAllSignatureSampleRates() map[Signature]float64 { + m := s.GetAllCountScores() + for k, v := range m { + m[k] = capTo1(v) } + return m +} + +// GetDefaultSampleRate gives the sample rate to apply to an unknown signature. +// For now, only based on count score. +func (s *Sampler) GetDefaultSampleRate() float64 { + return capTo1(s.GetDefaultCountScore()) +} - return score +func (s *Sampler) backendScoreToSamplerScore(score float64) float64 { + return s.signatureScoreFactor / math.Pow(s.signatureScoreSlope, math.Log10(score)) } // GetCountScore scores any signature based on its recent throughput // The score value can be seeing as the sample rate if the count were the only factor // Since other factors can intervene (such as extra global sampling), its value can be larger than 1 func (s *Sampler) GetCountScore(signature Signature) float64 { - score := s.Backend.GetSignatureScore(signature) + return s.backendScoreToSamplerScore(s.Backend.GetSignatureScore(signature)) +} - return s.signatureScoreFactor / math.Pow(s.signatureScoreSlope, math.Log10(score)) +// GetAllCountScores scores all signatures based on their recent throughput +// The score value can be seeing as the sample rate if the count were the only factor +// Since other factors can intervene (such as extra global sampling), its value can be larger than 1 +func (s *Sampler) GetAllCountScores() map[Signature]float64 { + m := s.Backend.GetAllSignatureScores() + for k, v := range m { + m[k] = s.backendScoreToSamplerScore(v) + } + return m +} + +// GetDefaultCountScore returns a default score when not knowing the signature for real. +// Since other factors can intervene (such as extra global sampling), its value can be larger than 1 +func (s *Sampler) GetDefaultCountScore() float64 { + return s.backendScoreToSamplerScore(s.Backend.GetTotalScore()) } diff --git a/sampler/scoresampler.go b/sampler/scoresampler.go new file mode 100644 index 000000000..9b8dc39cc --- /dev/null +++ b/sampler/scoresampler.go @@ -0,0 +1,91 @@ +// Package sampler contains all the logic of the agent-side trace sampling +// +// Currently implementation is based on the scoring of the "signature" of each trace +// Based on the score, we get a sample rate to apply to the given trace +// +// Current score implementation is super-simple, it is a counter with polynomial decay per signature. +// We increment it for each incoming trace then we periodically divide the score by two every X seconds. +// Right after the division, the score is an approximation of the number of received signatures over X seconds. +// It is different from the scoring in the Agent. +// +// Since the sampling can happen at different levels (client, agent, server) or depending on different rules, +// we have to track the sample rate applied at previous steps. This way, sampling twice at 50% can result in an +// effective 25% sampling. The rate is stored as a metric in the trace root. +package sampler + +import ( + "github.com/DataDog/datadog-trace-agent/model" +) + +// ScoreEngine is the main component of the sampling logic +type ScoreEngine struct { + // Sampler is the underlying sampler used by this engine, sharing logic among various engines. + Sampler *Sampler +} + +// NewScoreEngine returns an initialized Sampler +func NewScoreEngine(extraRate float64, maxTPS float64) *ScoreEngine { + s := &ScoreEngine{ + Sampler: newSampler(extraRate, maxTPS), + } + + return s +} + +// Run runs and block on the Sampler main loop +func (s *ScoreEngine) Run() { + s.Sampler.Run() +} + +// Stop stops the main Run loop +func (s *ScoreEngine) Stop() { + s.Sampler.Stop() +} + +func applySampleRate(root *model.Span, sampleRate float64) bool { + initialRate := GetTraceAppliedSampleRate(root) + newRate := initialRate * sampleRate + SetTraceAppliedSampleRate(root, newRate) + + traceID := root.TraceID + + return SampleByRate(traceID, newRate) +} + +// Sample counts an incoming trace and tells if it is a sample which has to be kept +func (s *ScoreEngine) Sample(trace model.Trace, root *model.Span, env string) bool { + // Extra safety, just in case one trace is empty + if len(trace) == 0 { + return false + } + + signature := computeSignatureWithRootAndEnv(trace, root, env) + + // Update sampler state by counting this trace + s.Sampler.Backend.CountSignature(signature) + + sampleRate := s.Sampler.GetSampleRate(trace, root, signature) + + sampled := applySampleRate(root, sampleRate) + + if sampled { + // Count the trace to allow us to check for the maxTPS limit. + // It has to happen before the maxTPS sampling. + s.Sampler.Backend.CountSample() + + // Check for the maxTPS limit, and if we require an extra sampling. + // No need to check if we already decided not to keep the trace. + maxTPSrate := s.Sampler.GetMaxTPSSampleRate() + if maxTPSrate < 1 { + sampled = applySampleRate(root, maxTPSrate) + } + } + + return sampled +} + +// GetState collects and return internal statistics and coefficients for indication purposes +// It returns an interface{}, as other samplers might return other informations. +func (s *ScoreEngine) GetState() interface{} { + return s.Sampler.GetState() +} diff --git a/sampler/sampler_test.go b/sampler/scoresampler_test.go similarity index 70% rename from sampler/sampler_test.go rename to sampler/scoresampler_test.go index 99b536bff..3f2ce3d90 100644 --- a/sampler/sampler_test.go +++ b/sampler/scoresampler_test.go @@ -4,7 +4,6 @@ import ( "math" "math/rand" "testing" - "time" log "github.com/cihub/seelog" @@ -14,7 +13,7 @@ import ( const defaultEnv = "none" -func getTestSampler() *Sampler { +func getTestScoreEngine() *ScoreEngine { // Disable debug logs in these tests log.UseLogger(log.Disabled) @@ -22,7 +21,7 @@ func getTestSampler() *Sampler { extraRate := 1.0 maxTPS := 0.0 - return NewSampler(extraRate, maxTPS) + return NewScoreEngine(extraRate, maxTPS) } func getTestTrace() (model.Trace, *model.Span) { @@ -34,50 +33,30 @@ func getTestTrace() (model.Trace, *model.Span) { return trace, &trace[0] } -func TestSamplerLoop(t *testing.T) { - s := getTestSampler() - - exit := make(chan bool) - - go func() { - s.Run() - close(exit) - }() - - s.Stop() - - select { - case <-exit: - return - case <-time.After(time.Second * 1): - assert.Fail(t, "Sampler took more than 1 second to close") - } -} - func TestExtraSampleRate(t *testing.T) { assert := assert.New(t) - s := getTestSampler() + s := getTestScoreEngine() trace, root := getTestTrace() - signature := ComputeSignature(trace) + signature := testComputeSignature(trace) // Feed the s with a signature so that it has a < 1 sample rate for i := 0; i < int(1e6); i++ { s.Sample(trace, root, defaultEnv) } - sRate := s.GetSampleRate(trace, root, signature) + sRate := s.Sampler.GetSampleRate(trace, root, signature) // Then turn on the extra sample rate, then ensure it affects both existing and new signatures - s.extraRate = 0.33 + s.Sampler.extraRate = 0.33 - assert.Equal(s.GetSampleRate(trace, root, signature), s.extraRate*sRate) + assert.Equal(s.Sampler.GetSampleRate(trace, root, signature), s.Sampler.extraRate*sRate) } func TestMaxTPS(t *testing.T) { // Test the "effectiveness" of the maxTPS option. assert := assert.New(t) - s := getTestSampler() + s := getTestScoreEngine() maxTPS := 5.0 tps := 100.0 @@ -85,17 +64,17 @@ func TestMaxTPS(t *testing.T) { initPeriods := 20 periods := 50 - s.maxTPS = maxTPS - periodSeconds := s.Backend.decayPeriod.Seconds() + s.Sampler.maxTPS = maxTPS + periodSeconds := s.Sampler.Backend.decayPeriod.Seconds() tracesPerPeriod := tps * periodSeconds // Set signature score offset high enough not to kick in during the test. - s.signatureScoreOffset = 2 * tps - s.signatureScoreFactor = math.Pow(s.signatureScoreSlope, math.Log10(s.signatureScoreOffset)) + s.Sampler.signatureScoreOffset = 2 * tps + s.Sampler.signatureScoreFactor = math.Pow(s.Sampler.signatureScoreSlope, math.Log10(s.Sampler.signatureScoreOffset)) sampledCount := 0 for period := 0; period < initPeriods+periods; period++ { - s.Backend.DecayScore() + s.Sampler.Backend.DecayScore() for i := 0; i < int(tracesPerPeriod); i++ { trace, root := getTestTrace() sampled := s.Sample(trace, root, defaultEnv) @@ -107,21 +86,21 @@ func TestMaxTPS(t *testing.T) { } // Check that the sampled score pre-maxTPS is equals to the incoming number of traces per second - assert.InEpsilon(tps, s.Backend.GetSampledScore(), 0.01) + assert.InEpsilon(tps, s.Sampler.Backend.GetSampledScore(), 0.01) // We should have kept less traces per second than maxTPS - assert.True(s.maxTPS >= float64(sampledCount)/(float64(periods)*periodSeconds)) + assert.True(s.Sampler.maxTPS >= float64(sampledCount)/(float64(periods)*periodSeconds)) // We should have a throughput of sampled traces around maxTPS // Check for 1% epsilon, but the precision also depends on the backend imprecision (error factor = decayFactor). // Combine error rates with L1-norm instead of L2-norm by laziness, still good enough for tests. - assert.InEpsilon(s.maxTPS, float64(sampledCount)/(float64(periods)*periodSeconds), - 0.01+s.Backend.decayFactor-1) + assert.InEpsilon(s.Sampler.maxTPS, float64(sampledCount)/(float64(periods)*periodSeconds), + 0.01+s.Sampler.Backend.decayFactor-1) } func TestSamplerChainedSampling(t *testing.T) { assert := assert.New(t) - s := getTestSampler() + s := getTestScoreEngine() trace, _ := getTestTrace() @@ -132,7 +111,7 @@ func TestSamplerChainedSampling(t *testing.T) { assert.Equal(0.8, GetTraceAppliedSampleRate(root)) // Sample again with an ensured rate, rates should be combined - s.extraRate = 0.5 + s.Sampler.extraRate = 0.5 s.Sample(trace, root, defaultEnv) assert.Equal(0.4, GetTraceAppliedSampleRate(root)) @@ -141,13 +120,26 @@ func TestSamplerChainedSampling(t *testing.T) { assert.Equal(0.4, GetTraceAppliedSampleRate(rootAgain)) } +func TestApplySampleRate(t *testing.T) { + assert := assert.New(t) + tID := randomTraceID() + + root := model.Span{TraceID: tID, SpanID: 1, ParentID: 0, Start: 123, Duration: 100000, Service: "mcnulty", Type: "web"} + + applySampleRate(&root, 0.4) + assert.Equal(0.4, root.Metrics["_sample_rate"], "sample rate should be 40%%") + + applySampleRate(&root, 0.5) + assert.Equal(0.2, root.Metrics["_sample_rate"], "sample rate should be 20%% (50%% of 40%%)") +} + func BenchmarkSampler(b *testing.B) { // Benchmark the resource consumption of many traces sampling // Up to signatureCount different signatures signatureCount := 20 - s := getTestSampler() + s := getTestScoreEngine() b.ResetTimer() b.ReportAllocs() @@ -163,3 +155,6 @@ func BenchmarkSampler(b *testing.B) { s.Sample(trace, &trace[0], defaultEnv) } } + +// Ensure ScoreEngine implements engine. +var testScoreEngine Engine = &ScoreEngine{} diff --git a/sampler/signature.go b/sampler/signature.go index 57cf94fdf..5834ff3d0 100644 --- a/sampler/signature.go +++ b/sampler/signature.go @@ -10,10 +10,20 @@ import ( // Signature is a simple representation of trace, used to identify simlar traces type Signature uint64 -// ComputeSignatureWithRootAndEnv generates the signature of a trace knowing its root +// spanHash is the type of the hashes used during the computation of a signature +// Use FNV for hashing since it is super-cheap and we have no cryptographic needs +type spanHash uint32 +type spanHashSlice []spanHash + +func (p spanHashSlice) Len() int { return len(p) } +func (p spanHashSlice) Less(i, j int) bool { return p[i] < p[j] } +func (p spanHashSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func sortHashes(hashes []spanHash) { sort.Sort(spanHashSlice(hashes)) } + +// computeSignatureWithRootAndEnv generates the signature of a trace knowing its root // Signature based on the hash of (env, service, name, resource, is_error) for the root, plus the set of // (env, service, name, is_error) of each span. -func ComputeSignatureWithRootAndEnv(trace model.Trace, root *model.Span, env string) Signature { +func computeSignatureWithRootAndEnv(trace model.Trace, root *model.Span, env string) Signature { rootHash := computeRootHash(*root, env) spanHashes := make([]spanHash, 0, len(trace)) @@ -36,12 +46,12 @@ func ComputeSignatureWithRootAndEnv(trace model.Trace, root *model.Span, env str return Signature(traceHash) } -// ComputeSignature is the same as ComputeSignatureWithRoot, except that it finds the root itself -func ComputeSignature(trace model.Trace) Signature { - root := trace.GetRoot() - env := trace.GetEnv() - - return ComputeSignatureWithRootAndEnv(trace, root, env) +// computeServiceSignature generates the signature of a trace with minimal +// information such as service and env, this is typically used by distributed +// sampling based on priority, and used as a key to store the desired rate +// for a given service,env tuple. +func computeServiceSignature(root *model.Span, env string) Signature { + return Signature(computeServiceHash(*root, env)) } func computeSpanHash(span model.Span, env string) spanHash { @@ -65,12 +75,11 @@ func computeRootHash(span model.Span, env string) spanHash { return spanHash(h.Sum32()) } -// spanHash is the type of the hashes used during the computation of a signature -// Use FNV for hashing since it is super-cheap and we have no cryptographic needs -type spanHash uint32 -type spanHashSlice []spanHash +func computeServiceHash(span model.Span, env string) spanHash { + h := fnv.New32a() + h.Write([]byte(span.Service)) + h.Write([]byte{','}) + h.Write([]byte(env)) -func (p spanHashSlice) Len() int { return len(p) } -func (p spanHashSlice) Less(i, j int) bool { return p[i] < p[j] } -func (p spanHashSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func sortHashes(hashes []spanHash) { sort.Sort(spanHashSlice(hashes)) } + return spanHash(h.Sum32()) +} diff --git a/sampler/signature_test.go b/sampler/signature_test.go index 9f67645f3..de8ea5786 100644 --- a/sampler/signature_test.go +++ b/sampler/signature_test.go @@ -7,8 +7,15 @@ import ( "github.com/stretchr/testify/assert" ) +func testComputeSignature(trace model.Trace) Signature { + root := trace.GetRoot() + env := trace.GetEnv() + return computeSignatureWithRootAndEnv(trace, root, env) +} + func TestSignatureSimilar(t *testing.T) { assert := assert.New(t) + t1 := model.Trace{ model.Span{TraceID: 101, SpanID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 26965}, model.Span{TraceID: 101, SpanID: 1012, ParentID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 197884}, @@ -21,11 +28,12 @@ func TestSignatureSimilar(t *testing.T) { model.Span{TraceID: 102, SpanID: 1023, ParentID: 1022, Service: "x2", Name: "y2", Resource: "z2", Duration: 349944}, } - assert.Equal(ComputeSignature(t1), ComputeSignature(t2)) + assert.Equal(testComputeSignature(t1), testComputeSignature(t2)) } func TestSignatureDifferentError(t *testing.T) { assert := assert.New(t) + t1 := model.Trace{ model.Span{TraceID: 101, SpanID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 26965}, model.Span{TraceID: 101, SpanID: 1012, ParentID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 197884}, @@ -38,11 +46,12 @@ func TestSignatureDifferentError(t *testing.T) { model.Span{TraceID: 110, SpanID: 1103, ParentID: 1101, Service: "x2", Name: "y2", Resource: "z2", Duration: 349944}, } - assert.NotEqual(ComputeSignature(t1), ComputeSignature(t2)) + assert.NotEqual(testComputeSignature(t1), testComputeSignature(t2)) } func TestSignatureDifferentRoot(t *testing.T) { assert := assert.New(t) + t1 := model.Trace{ model.Span{TraceID: 101, SpanID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 26965}, model.Span{TraceID: 101, SpanID: 1012, ParentID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 197884}, @@ -55,5 +64,64 @@ func TestSignatureDifferentRoot(t *testing.T) { model.Span{TraceID: 103, SpanID: 1033, ParentID: 1032, Service: "x1", Name: "y1", Resource: "z1", Duration: 152342344}, } - assert.NotEqual(ComputeSignature(t1), ComputeSignature(t2)) + assert.NotEqual(testComputeSignature(t1), testComputeSignature(t2)) +} + +func testComputeServiceSignature(trace model.Trace) Signature { + root := trace.GetRoot() + env := trace.GetEnv() + return computeServiceSignature(root, env) +} + +func TestServiceSignatureSimilar(t *testing.T) { + assert := assert.New(t) + + t1 := model.Trace{ + model.Span{TraceID: 101, SpanID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 26965}, + model.Span{TraceID: 101, SpanID: 1012, ParentID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 197884}, + model.Span{TraceID: 101, SpanID: 1013, ParentID: 1012, Service: "x1", Name: "y1", Resource: "z1", Duration: 12304982304}, + model.Span{TraceID: 101, SpanID: 1014, ParentID: 1013, Service: "x2", Name: "y2", Resource: "z2", Duration: 34384993}, + } + t2 := model.Trace{ + model.Span{TraceID: 102, SpanID: 1021, Service: "x1", Name: "y2", Resource: "z2", Duration: 992312}, + model.Span{TraceID: 102, SpanID: 1022, ParentID: 1021, Service: "x1", Name: "y1", Resource: "z1", Error: 1, Duration: 34347}, + model.Span{TraceID: 102, SpanID: 1023, ParentID: 1022, Service: "x2", Name: "y2", Resource: "z2", Duration: 349944}, + } + assert.Equal(testComputeServiceSignature(t1), testComputeServiceSignature(t2)) +} + +func TestServiceSignatureDifferentService(t *testing.T) { + assert := assert.New(t) + + t1 := model.Trace{ + model.Span{TraceID: 101, SpanID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 26965}, + model.Span{TraceID: 101, SpanID: 1012, ParentID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 197884}, + model.Span{TraceID: 101, SpanID: 1013, ParentID: 1012, Service: "x1", Name: "y1", Resource: "z1", Duration: 12304982304}, + model.Span{TraceID: 101, SpanID: 1014, ParentID: 1013, Service: "x2", Name: "y2", Resource: "z2", Duration: 34384993}, + } + t2 := model.Trace{ + model.Span{TraceID: 103, SpanID: 1031, Service: "x2", Name: "y1", Resource: "z1", Duration: 19207}, + model.Span{TraceID: 103, SpanID: 1032, ParentID: 1031, Service: "x1", Name: "y1", Resource: "z1", Duration: 234923874}, + model.Span{TraceID: 103, SpanID: 1033, ParentID: 1032, Service: "x1", Name: "y1", Resource: "z1", Duration: 152342344}, + } + + assert.NotEqual(testComputeServiceSignature(t1), testComputeServiceSignature(t2)) +} + +func TestServiceSignatureDifferentEnv(t *testing.T) { + assert := assert.New(t) + + t1 := model.Trace{ + model.Span{TraceID: 101, SpanID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 26965, Meta: map[string]string{"env": "test"}}, + model.Span{TraceID: 101, SpanID: 1012, ParentID: 1011, Service: "x1", Name: "y1", Resource: "z1", Duration: 197884}, + model.Span{TraceID: 101, SpanID: 1013, ParentID: 1012, Service: "x1", Name: "y1", Resource: "z1", Duration: 12304982304}, + model.Span{TraceID: 101, SpanID: 1014, ParentID: 1013, Service: "x2", Name: "y2", Resource: "z2", Duration: 34384993}, + } + t2 := model.Trace{ + model.Span{TraceID: 110, SpanID: 1101, Service: "x1", Name: "y1", Resource: "z1", Duration: 992312, Meta: map[string]string{"env": "prod"}}, + model.Span{TraceID: 110, SpanID: 1102, ParentID: 1101, Service: "x1", Name: "y1", Resource: "z1", Duration: 34347}, + model.Span{TraceID: 110, SpanID: 1103, ParentID: 1101, Service: "x2", Name: "y2", Resource: "z2", Duration: 349944}, + } + + assert.NotEqual(testComputeServiceSignature(t1), testComputeServiceSignature(t2)) } diff --git a/sampler/state.go b/sampler/state.go index 9cadc2d8d..5fac758f5 100644 --- a/sampler/state.go +++ b/sampler/state.go @@ -1,6 +1,6 @@ package sampler -// InternalState exposes all the main internal settings of the scope sampler +// InternalState exposes all the main internal settings of the score sampler type InternalState struct { Offset float64 Slope float64 @@ -13,11 +13,11 @@ type InternalState struct { // GetState collects and return internal statistics and coefficients for indication purposes func (s *Sampler) GetState() InternalState { return InternalState{ - s.signatureScoreOffset, - s.signatureScoreSlope, - s.Backend.GetCardinality(), - s.Backend.GetTotalScore(), - s.Backend.GetSampledScore(), - s.maxTPS, + Offset: s.signatureScoreOffset, + Slope: s.signatureScoreSlope, + Cardinality: s.Backend.GetCardinality(), + InTPS: s.Backend.GetTotalScore(), + OutTPS: s.Backend.GetSampledScore(), + MaxTPS: s.maxTPS, } }