Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
[distributed sampling] introducing service sampler
Browse files Browse the repository at this point in the history
[distributed sampling] instroduced service sampler

[distributed sampling] added new channel to handle distributed traces

[distributed tracing] fixed stats status

[distributing sampling] test typo

[distributed sampling] added JSON output with rate by service

[distributed sampling] encapsulating response in a container

[distributed sampling] handling distributed traces for real, sending feedback to client lib

[distributed sampling] trusting client library info in by service sampler

[distributed sampling] fixed agent tests

[distributed sampling] adding metrics and visual output about internal sampling rates

[distributed sampling] added sample rate feedback in -info command

[distributed sampling] added test for ApplyRate funcs

[distributed sampling] added test for service signature

[distributed sampling] replaced sampling.priority by _sampling_priority_v1

As this is still experimental, replacing sampling.priority by _sampling_priority_v1,
and also converting it from a meta to a metric. This makes code easier to handle
and saves many string to number conversion, and additionnally, the _sampling_priority_v1
metric is naturally trimmed from front-end UI as it's:
- a) a metric
- b) it starts by "_"

[distributed sampling] removed debug metric

[distributed sampling] using genuine engine type as key in info hash

[distributed samping] fixed sampling on services (algo and tests)

[distributed tracing] DistributedSampler -> PrioritySampler

[distributed sampling] removing generic constructor

[distributed sampling] refactored samplers to share common code without hooks

[distributed sampling] using defers in info publishing code

[distributed sampling] choosing sampler depending on trace content

[distributed sampling] updated receiver tests

[distributed sampling] made sampler internal public again

[distributed sampling] fixed expvar to avoid dashboard migration

[distributed tracing] added metrics to follow sampling priority on receiver

[distributed sampling] returning a meaningful value for the default sample rate

[distributed sampling] returning something else than 1 for default sampling value

[distributed sampling] removed debug message

[distributed sampling] enabling priority sampling by default

[distributed sampling] removing useless sample rate metric when priority sampling

[distributed sampling] removed pointless code

[distributed sampling] added docstring

[distributed sampling] stats rework (using tags for sampling priority)

[distributed sampling] typo fixes

[distributed sampling] refined '-info' output

This patch:

- shows "Sample Rate" instead of "Rate"
- removes the default sample rate which is a fallback and should not be used in most cases

[distributed sampling] moved rate by service to config package in dynamic config

[distributed sampling] using dynamic config to report rate by service

[distributed sampling] refactored and rewrote tests
  • Loading branch information
ufoot committed Oct 10, 2017
1 parent e3d3525 commit 75bd88e
Show file tree
Hide file tree
Showing 26 changed files with 1,324 additions and 315 deletions.
97 changes: 70 additions & 27 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (
"github.com/DataDog/datadog-trace-agent/watchdog"
)

const processStatsInterval = time.Minute
const languageHeaderKey = "X-Datadog-Reported-Languages"
const (
processStatsInterval = time.Minute
languageHeaderKey = "X-Datadog-Reported-Languages"
samplingPriorityKey = "_sampling_priority_v1"
)

type processedTrace struct {
Trace model.Trace
Expand All @@ -34,14 +37,16 @@ func (pt *processedTrace) weight() float64 {

// Agent struct holds all the sub-routines structs and make the data flow between them
type Agent struct {
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
Sampler *Sampler
Writer *Writer
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
ScoreEngine *Sampler
PriorityEngine *Sampler
Writer *Writer

// config
conf *config.AgentConfig
conf *config.AgentConfig
dynConf *config.DynamicConfig

// Used to synchronize on a clean exit
exit chan struct{}
Expand All @@ -53,26 +58,34 @@ type Agent struct {
func NewAgent(conf *config.AgentConfig) *Agent {
exit := make(chan struct{})

r := NewHTTPReceiver(conf)
dynConf := config.NewDynamicConfig()
r := NewHTTPReceiver(conf, dynConf)
c := NewConcentrator(
conf.ExtraAggregators,
conf.BucketInterval.Nanoseconds(),
)
f := filters.Setup(conf)
s := NewSampler(conf)
ss := NewScoreEngine(conf)
var ps *Sampler
if conf.PrioritySampling {
// Use priority sampling for distributed tracing only if conf says so
ps = NewPriorityEngine(conf, dynConf)
}

w := NewWriter(conf)
w.inServices = r.services

return &Agent{
Receiver: r,
Concentrator: c,
Filters: f,
Sampler: s,
Writer: w,
conf: conf,
exit: exit,
die: die,
Receiver: r,
Concentrator: c,
Filters: f,
ScoreEngine: ss,
PriorityEngine: ps,
Writer: w,
conf: conf,
dynConf: dynConf,
exit: exit,
die: die,
}
}

Expand All @@ -92,7 +105,10 @@ func (a *Agent) Run() {

a.Receiver.Run()
a.Writer.Run()
a.Sampler.Run()
a.ScoreEngine.Run()
if a.PriorityEngine != nil {
a.PriorityEngine.Run()
}

for {
select {
Expand All @@ -112,7 +128,13 @@ func (a *Agent) Run() {
}()
go func() {
defer watchdog.LogOnPanic()
p.Traces = a.Sampler.Flush()
// Serializing both flushes, classic agent sampler and distributed sampler,
// in most cases only one will be used, so in mainstream case there should
// be no performance issue, only in transitionnal mode can both contain data.
p.Traces = a.ScoreEngine.Flush()
if a.PriorityEngine != nil {
p.Traces = append(p.Traces, a.PriorityEngine.Flush()...)
}
wg.Done()
}()

Expand All @@ -126,14 +148,17 @@ func (a *Agent) Run() {
log.Info("exiting")
close(a.Receiver.exit)
a.Writer.Stop()
a.Sampler.Stop()
a.ScoreEngine.Stop()
if a.PriorityEngine != nil {
a.PriorityEngine.Stop()
}
return
}
}
}

// Process is the default work unit that receives a trace, transforms it and
// passes it downstream
// passes it downstream.
func (a *Agent) Process(t model.Trace) {
if len(t) == 0 {
// XXX Should never happen since we reject empty traces during
Expand All @@ -143,12 +168,31 @@ func (a *Agent) Process(t model.Trace) {
}

root := t.GetRoot()

// We get the address of the struct holding the stats associated to no tags
ts := a.Receiver.stats.getTagStats(Tags{})

// We choose the sampler dynamically, depending on trace content,
// it has a sampling priority info (wether 0 or 1 or more) we respect
// this by using priority sampler. Else, use default score sampler.
s := a.ScoreEngine
priorityPtr := &ts.TracesPriorityNone
if a.PriorityEngine != nil {
if priority, ok := root.Metrics[samplingPriorityKey]; ok {
s = a.PriorityEngine

if priority == 0 {
priorityPtr = &ts.TracesPriority0
} else {
priorityPtr = &ts.TracesPriority1
}
}
}
atomic.AddInt64(priorityPtr, 1)

if root.End() < model.Now()-2*a.conf.BucketInterval.Nanoseconds() {
log.Errorf("skipping trace with root too far in past, root:%v", *root)

// We get the address of the struct holding the stats associated to the tags
ts := a.Receiver.stats.getTagStats(Tags{})

atomic.AddInt64(&ts.TracesDropped, 1)
atomic.AddInt64(&ts.SpansDropped, int64(len(t)))
return
Expand All @@ -160,7 +204,6 @@ func (a *Agent) Process(t model.Trace) {
}

log.Debugf("rejecting trace by filter: %T %v", f, *root)
ts := a.Receiver.stats.getTagStats(Tags{})
atomic.AddInt64(&ts.TracesFiltered, 1)
atomic.AddInt64(&ts.SpansFiltered, int64(len(t)))

Expand Down Expand Up @@ -201,7 +244,7 @@ func (a *Agent) Process(t model.Trace) {
}()
go func() {
defer watchdog.LogOnPanic()
a.Sampler.Add(pt)
s.Add(pt)
}()
}

Expand Down
127 changes: 80 additions & 47 deletions agent/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ import (
)

var (
infoMu sync.RWMutex
infoReceiverStats []tagStats // only for the last minute
infoEndpointStats endpointStats // only for the last minute
infoWatchdogInfo watchdog.Info
infoSamplerInfo samplerInfo
infoPreSamplerStats sampler.PreSamplerStats
infoStart = time.Now()
infoOnce sync.Once
infoTmpl *template.Template
infoNotRunningTmpl *template.Template
infoErrorTmpl *template.Template
infoMu sync.RWMutex
infoReceiverStats []tagStats // only for the last minute
infoEndpointStats endpointStats // only for the last minute
infoWatchdogInfo watchdog.Info
infoSamplerInfo samplerInfo
infoPrioritySamplerInfo samplerInfo
infoRateByService map[string]float64
infoPreSamplerStats sampler.PreSamplerStats
infoStart = time.Now()
infoOnce sync.Once
infoTmpl *template.Template
infoNotRunningTmpl *template.Template
infoErrorTmpl *template.Template
)

const (
Expand All @@ -44,21 +46,23 @@ const (
Hostname: {{.Status.Config.HostName}}
Receiver: {{.Status.Config.ReceiverHost}}:{{.Status.Config.ReceiverPort}}
API Endpoint: {{.Status.Config.APIEndpoint}}{{ range $i, $ts := .Status.Receiver }}
--- Receiver stats (1 min) ---
{{if $ts.Tags.Lang}}-> tags: {{ $ts.Tags.Lang }}, {{ $ts.Tags.LangVersion }}, {{ $ts.Tags.Interpreter }}, {{ $ts.Tags.TracerVersion }}
{{else}}-> tags: None{{end}}
-> tags: {{if $ts.Tags.Lang}}{{ $ts.Tags.Lang }}, {{ $ts.Tags.LangVersion }}, {{ $ts.Tags.Interpreter }}, {{ $ts.Tags.TracerVersion }}{{else}}None{{end}}
Traces received: {{ $ts.Stats.TracesReceived }} ({{ $ts.Stats.TracesBytes }} bytes)
Spans received: {{ $ts.Stats.SpansReceived }}
Services received: {{ $ts.Stats.ServicesReceived }} ({{ $ts.Stats.ServicesBytes }} bytes)
Total data received : {{ add $ts.Stats.TracesBytes $ts.Stats.ServicesBytes }} bytes{{if gt $ts.Stats.TracesDropped 0}}
Total data received: {{ add $ts.Stats.TracesBytes $ts.Stats.ServicesBytes }} bytes{{if gt $ts.Stats.TracesDropped 0}}
WARNING: Traces dropped: {{ $ts.Stats.TracesDropped }}
{{end}}{{if gt $ts.Stats.SpansDropped 0}}WARNING: Spans dropped: {{ $ts.Stats.SpansDropped }}{{end}}
------------------------------{{end}}{{if lt .Status.PreSampler.Rate 1.0}}
------------------------------{{end}}
{{ range $key, $value := .Status.RateByService }}
Sample rate for '{{ $key }}': {{percent $value}} %{{ end }}{{if lt .Status.PreSampler.Rate 1.0}}
WARNING: Pre-sampling traces: {{percent .Status.PreSampler.Rate}} %
{{end}}{{if .Status.PreSampler.Error}} WARNING: Pre-sampler: {{.Status.PreSampler.Error}}
{{end}}
Expand Down Expand Up @@ -93,75 +97,101 @@ func publishUptime() interface{} {

func updateReceiverStats(rs *receiverStats) {
infoMu.Lock()

defer infoMu.Unlock()
rs.RLock()
defer rs.RUnlock()

s := make([]tagStats, 0, len(rs.Stats))
for _, tagStats := range rs.Stats {
s = append(s, *tagStats)
}
rs.RUnlock()

infoReceiverStats = s
infoMu.Unlock()
}

func publishReceiverStats() interface{} {
infoMu.RLock()
rs := infoReceiverStats
infoMu.RUnlock()
return rs
defer infoMu.RUnlock()
return infoReceiverStats
}

func updateEndpointStats(es endpointStats) {
infoMu.Lock()
defer infoMu.Unlock()
infoEndpointStats = es
infoMu.Unlock()
}

func publishEndpointStats() interface{} {
infoMu.RLock()
es := infoEndpointStats
infoMu.RUnlock()
return es
defer infoMu.RUnlock()
return infoEndpointStats
}

func updateSamplerInfo(ss samplerInfo) {
infoMu.Lock()
defer infoMu.Unlock()

infoSamplerInfo = ss
infoMu.Unlock()
}

func publishSamplerInfo() interface{} {
infoMu.RLock()
ss := infoSamplerInfo
infoMu.RUnlock()
return ss
defer infoMu.RUnlock()
return infoSamplerInfo
}

func updatePrioritySamplerInfo(ss samplerInfo) {
infoMu.Lock()
defer infoMu.Unlock()

infoPrioritySamplerInfo = ss
}

func publishPrioritySamplerInfo() interface{} {
infoMu.RLock()
defer infoMu.RUnlock()
return infoPrioritySamplerInfo
}

func updateRateByService(rbs map[string]float64) {
infoMu.Lock()
defer infoMu.Unlock()
// remove the default service and env, it can be inferred from other
// values so has little added-value and could be confusing for users.
// Besides, if one still really wants it:
// curl http://localhost:8126/degug/vars would show it.
delete(rbs, "service:,env:")
infoRateByService = rbs
}

func publishRateByService() interface{} {
infoMu.RLock()
defer infoMu.RUnlock()
return infoRateByService
}

func updateWatchdogInfo(wi watchdog.Info) {
infoMu.Lock()
defer infoMu.Unlock()
infoWatchdogInfo = wi
infoMu.Unlock()
}

func publishWatchdogInfo() interface{} {
infoMu.RLock()
wi := infoWatchdogInfo
infoMu.RUnlock()
return wi
defer infoMu.RUnlock()
return infoWatchdogInfo
}

func updatePreSampler(ss sampler.PreSamplerStats) {
infoMu.Lock()
defer infoMu.Unlock()
infoPreSamplerStats = ss
infoMu.Unlock()
}

func publishPreSamplerStats() interface{} {
infoMu.RLock()
ss := infoPreSamplerStats
infoMu.RUnlock()
return ss
defer infoMu.RUnlock()
return infoPreSamplerStats
}

type infoVersion struct {
Expand Down Expand Up @@ -206,6 +236,8 @@ func initInfo(conf *config.AgentConfig) error {
expvar.Publish("receiver", expvar.Func(publishReceiverStats))
expvar.Publish("endpoint", expvar.Func(publishEndpointStats))
expvar.Publish("sampler", expvar.Func(publishSamplerInfo))
expvar.Publish("prioritysampler", expvar.Func(publishPrioritySamplerInfo))
expvar.Publish("ratebyservice", expvar.Func(publishRateByService))
expvar.Publish("watchdog", expvar.Func(publishWatchdogInfo))
expvar.Publish("presampler", expvar.Func(publishPreSamplerStats))

Expand Down Expand Up @@ -253,12 +285,13 @@ type StatusInfo struct {
MemStats struct {
Alloc uint64
} `json:"memstats"`
Version infoVersion `json:"version"`
Receiver []tagStats `json:"receiver"`
Endpoint endpointStats `json:"endpoint"`
Watchdog watchdog.Info `json:"watchdog"`
PreSampler sampler.PreSamplerStats `json:"presampler"`
Config config.AgentConfig `json:"config"`
Version infoVersion `json:"version"`
Receiver []tagStats `json:"receiver"`
RateByService map[string]float64 `json:"ratebyservice"`
Endpoint endpointStats `json:"endpoint"`
Watchdog watchdog.Info `json:"watchdog"`
PreSampler sampler.PreSamplerStats `json:"presampler"`
Config config.AgentConfig `json:"config"`
}

func getProgramBanner(version string) (string, string) {
Expand Down
Loading

0 comments on commit 75bd88e

Please sign in to comment.