Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
Merge pull request #356 from DataDog/benjamin/fix-transaction-sampling
Browse files Browse the repository at this point in the history
Fix transaction sampling by moving it to TransactionSampler
  • Loading branch information
LotharSee authored Feb 6, 2018
2 parents 1071b69 + 4ba9111 commit 02c98a9
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 67 deletions.
61 changes: 35 additions & 26 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ func (pt *processedTrace) weight() float64 {

// Agent struct holds all the sub-routines structs and make the data flow between them
type Agent struct {
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
ScoreSampler *Sampler
PrioritySampler *Sampler
TraceWriter *writer.TraceWriter
ServiceWriter *writer.ServiceWriter
StatsWriter *writer.StatsWriter
ServiceExtractor *TraceServiceExtractor
ServiceMapper *ServiceMapper
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
ScoreSampler *Sampler
PrioritySampler *Sampler
TransactionSampler *TransactionSampler
TraceWriter *writer.TraceWriter
ServiceWriter *writer.ServiceWriter
StatsWriter *writer.StatsWriter
ServiceExtractor *TraceServiceExtractor
ServiceMapper *ServiceMapper

// config
conf *config.AgentConfig
Expand Down Expand Up @@ -80,8 +81,9 @@ func NewAgent(conf *config.AgentConfig, exit chan struct{}) *Agent {
)
f := filters.Setup(conf)

ss := NewScoreSampler(conf, sampledTraceChan, analyzedTransactionChan)
ps := NewPrioritySampler(conf, dynConf, sampledTraceChan, analyzedTransactionChan)
ss := NewScoreSampler(conf, sampledTraceChan)
ps := NewPrioritySampler(conf, dynConf, sampledTraceChan)
ts := NewTransactionSampler(conf, analyzedTransactionChan)
se := NewTraceServiceExtractor(serviceChan)
sm := NewServiceMapper(serviceChan, filteredServiceChan)
tw := writer.NewTraceWriter(conf, sampledTraceChan, analyzedTransactionChan)
Expand All @@ -93,20 +95,21 @@ func NewAgent(conf *config.AgentConfig, exit chan struct{}) *Agent {
sw.InStats = statsChan

return &Agent{
Receiver: r,
Concentrator: c,
Filters: f,
ScoreSampler: ss,
PrioritySampler: ps,
TraceWriter: tw,
StatsWriter: sw,
ServiceWriter: svcW,
ServiceExtractor: se,
ServiceMapper: sm,
conf: conf,
dynConf: dynConf,
exit: exit,
die: die,
Receiver: r,
Concentrator: c,
Filters: f,
ScoreSampler: ss,
PrioritySampler: ps,
TransactionSampler: ts,
TraceWriter: tw,
StatsWriter: sw,
ServiceWriter: svcW,
ServiceExtractor: se,
ServiceMapper: sm,
conf: conf,
dynConf: dynConf,
exit: exit,
die: die,
}
}

Expand Down Expand Up @@ -263,6 +266,12 @@ func (a *Agent) Process(t model.Trace) {
sampler.Add(pt)
}()
}
if a.TransactionSampler.Enabled() {
go func() {
defer watchdog.LogOnPanic()
a.TransactionSampler.Add(pt)
}()
}
}

func (a *Agent) watchdog() {
Expand Down
39 changes: 7 additions & 32 deletions agent/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (

// Sampler chooses wich spans to write to the API
type Sampler struct {
sampled chan *model.Trace
analyzed chan *model.Span
analyzedRateByService map[string]float64
sampled chan *model.Trace

// For stats
keptTraceCount int
Expand All @@ -30,22 +28,18 @@ type Sampler struct {
}

// NewScoreSampler creates a new empty sampler ready to be started
func NewScoreSampler(conf *config.AgentConfig, sampled chan *model.Trace, analyzed chan *model.Span) *Sampler {
func NewScoreSampler(conf *config.AgentConfig, sampled chan *model.Trace) *Sampler {
return &Sampler{
engine: sampler.NewScoreEngine(conf.ExtraSampleRate, conf.MaxTPS),
sampled: sampled,
analyzed: analyzed,
analyzedRateByService: conf.AnalyzedRateByService,
engine: sampler.NewScoreEngine(conf.ExtraSampleRate, conf.MaxTPS),
sampled: sampled,
}
}

// NewPrioritySampler creates a new empty distributed sampler ready to be started
func NewPrioritySampler(conf *config.AgentConfig, dynConf *config.DynamicConfig, sampled chan *model.Trace, analyzed chan *model.Span) *Sampler {
func NewPrioritySampler(conf *config.AgentConfig, dynConf *config.DynamicConfig, sampled chan *model.Trace) *Sampler {
return &Sampler{
engine: sampler.NewPriorityEngine(conf.ExtraSampleRate, conf.MaxTPS, &dynConf.RateByService),
sampled: sampled,
analyzed: analyzed,
analyzedRateByService: conf.AnalyzedRateByService,
engine: sampler.NewPriorityEngine(conf.ExtraSampleRate, conf.MaxTPS, &dynConf.RateByService),
sampled: sampled,
}
}

Expand All @@ -70,25 +64,6 @@ func (s *Sampler) Add(t processedTrace) {
s.keptTraceCount++
s.sampled <- &t.Trace
}

// inspect the WeightedTrace so that we can identify top-level spans
for _, span := range t.WeightedTrace {
s.Analyze(span)
}
}

// Analyze queues a span for analysis, applying any sample rate specified
// Only top-level spans are eligible to be analyzed
func (s *Sampler) Analyze(span *model.WeightedSpan) {
if !span.TopLevel {
return
}

if analyzeRate, ok := s.analyzedRateByService[span.Service]; ok {
if sampler.SampleByRate(span.TraceID, analyzeRate) {
s.analyzed <- span.Span
}
}
}

// Stop stops the sampler
Expand Down
51 changes: 51 additions & 0 deletions agent/transaction_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/sampler"
)

// TransactionSampler extracts and samples analyzed spans
type TransactionSampler struct {
analyzed chan *model.Span
analyzedRateByService map[string]float64
}

// NewTransactionSampler creates a new empty transaction sampler
func NewTransactionSampler(conf *config.AgentConfig, analyzed chan *model.Span) *TransactionSampler {
return &TransactionSampler{
analyzed: analyzed,
analyzedRateByService: conf.AnalyzedRateByService,
}
}

// Enabled tells if the transaction analysis is enabled
func (s *TransactionSampler) Enabled() bool {
return len(s.analyzedRateByService) > 0
}

// Add extracts analyzed spans and send them to its `analyzed` channel
func (s *TransactionSampler) Add(t processedTrace) {
// inspect the WeightedTrace so that we can identify top-level spans
for _, span := range t.WeightedTrace {
if s.Analyzed(span) {
s.analyzed <- span.Span
}
}
}

// Analyzed tells if a span should be considered as analyzed
// Only top-level spans are eligible to be analyzed
func (s *TransactionSampler) Analyzed(span *model.WeightedSpan) bool {
if !span.TopLevel {
return false
}

if analyzeRate, ok := s.analyzedRateByService[span.Service]; ok {
if sampler.SampleByRate(span.TraceID, analyzeRate) {
return true
}
}
return false
}
2 changes: 1 addition & 1 deletion info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const (
--- Writer stats (1 min) ---
Traces: {{.Status.TraceWriter.Payloads}} payloads, {{.Status.TraceWriter.Traces}} traces, {{.Status.TraceWriter.Bytes}} bytes
Traces: {{.Status.TraceWriter.Payloads}} payloads, {{.Status.TraceWriter.Traces}} traces, {{if gt .Status.TraceWriter.Transactions 0}}{{.Status.TraceWriter.Transactions}} transactions, {{end}}{{.Status.TraceWriter.Bytes}} bytes
{{if gt .Status.TraceWriter.Errors 0}}WARNING: Traces API errors (1 min): {{.Status.TraceWriter.Errors}}{{end}}
Stats: {{.Status.StatsWriter.Payloads}} payloads, {{.Status.StatsWriter.StatsBuckets}} stats buckets, {{.Status.StatsWriter.Bytes}} bytes
{{if gt .Status.StatsWriter.Errors 0}}WARNING: Stats API errors (1 min): {{.Status.StatsWriter.Errors}}{{end}}
Expand Down
2 changes: 1 addition & 1 deletion info/test_cases/okay.info
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ Trace Agent (v 0.99.0)

--- Writer stats (1 min) ---

Traces: 4 payloads, 26 traces, 3245 bytes
Traces: 4 payloads, 26 traces, 123 transactions, 3245 bytes
Stats: 6 payloads, 12 stats buckets, 8329 bytes
Services: 1 payloads, 2 services, 1234 bytes
2 changes: 1 addition & 1 deletion info/test_cases/okay.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"cmdline": ["./trace-agent"],
"config": {"Enabled":true,"HostName":"localhost.localdomain","DefaultEnv":"none","APIEndpoint":"https://trace.agent.datadoghq.com","APIEnabled":true,"APIPayloadBufferMaxSize":16777216,"BucketInterval":10000000000,"ExtraAggregators":[],"ExtraSampleRate":1,"MaxTPS":10,"ReceiverHost":"localhost","ReceiverPort":8126,"ConnectionLimit":2000,"ReceiverTimeout":0,"StatsdHost":"127.0.0.1","StatsdPort":8125,"LogLevel":"INFO","LogFilePath":"/var/log/datadog/trace-agent.log"},
"trace_writer": {"Payloads":4,"Bytes":3245,"Traces":26,"Errors":0},
"trace_writer": {"Payloads":4,"Bytes":3245,"Traces":26,"Transactions":123,"Errors":0},
"stats_writer": {"Payloads":6,"Bytes":8329,"StatsBuckets":12,"Errors":0},
"service_writer": {"Payloads":1,"Bytes":1234,"Services":2,"Errors":0},
"memstats": {"Alloc":773552,"TotalAlloc":773552,"Sys":3346432,"Lookups":6,"Mallocs":7231,"Frees":561,"HeapAlloc":773552,"HeapSys":1572864,"HeapIdle":49152,"HeapInuse":1523712,"HeapReleased":0,"HeapObjects":6670,"StackInuse":524288,"StackSys":524288,"MSpanInuse":24480,"MSpanSys":32768,"MCacheInuse":4800,"MCacheSys":16384,"BuckHashSys":2675,"GCSys":131072,"OtherSys":1066381,"NextGC":4194304,"LastGC":0,"PauseTotalNs":0,"PauseNs":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"PauseEnd":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"NumGC":0,"GCCPUFraction":0,"EnableGC":true,"DebugGC":false,"BySize":[{"Size":0,"Mallocs":0,"Frees":0},{"Size":8,"Mallocs":126,"Frees":0},{"Size":16,"Mallocs":825,"Frees":0},{"Size":32,"Mallocs":4208,"Frees":0},{"Size":48,"Mallocs":345,"Frees":0},{"Size":64,"Mallocs":262,"Frees":0},{"Size":80,"Mallocs":93,"Frees":0},{"Size":96,"Mallocs":70,"Frees":0},{"Size":112,"Mallocs":97,"Frees":0},{"Size":128,"Mallocs":24,"Frees":0},{"Size":144,"Mallocs":25,"Frees":0},{"Size":160,"Mallocs":57,"Frees":0},{"Size":176,"Mallocs":128,"Frees":0},{"Size":192,"Mallocs":13,"Frees":0},{"Size":208,"Mallocs":77,"Frees":0},{"Size":224,"Mallocs":3,"Frees":0},{"Size":240,"Mallocs":2,"Frees":0},{"Size":256,"Mallocs":17,"Frees":0},{"Size":288,"Mallocs":64,"Frees":0},{"Size":320,"Mallocs":12,"Frees":0},{"Size":352,"Mallocs":20,"Frees":0},{"Size":384,"Mallocs":1,"Frees":0},{"Size":416,"Mallocs":59,"Frees":0},{"Size":448,"Mallocs":0,"Frees":0},{"Size":480,"Mallocs":3,"Frees":0},{"Size":512,"Mallocs":2,"Frees":0},{"Size":576,"Mallocs":17,"Frees":0},{"Size":640,"Mallocs":6,"Frees":0},{"Size":704,"Mallocs":10,"Frees":0},{"Size":768,"Mallocs":0,"Frees":0},{"Size":896,"Mallocs":11,"Frees":0},{"Size":1024,"Mallocs":11,"Frees":0},{"Size":1152,"Mallocs":12,"Frees":0},{"Size":1280,"Mallocs":2,"Frees":0},{"Size":1408,"Mallocs":2,"Frees":0},{"Size":1536,"Mallocs":0,"Frees":0},{"Size":1664,"Mallocs":10,"Frees":0},{"Size":2048,"Mallocs":17,"Frees":0},{"Size":2304,"Mallocs":7,"Frees":0},{"Size":2560,"Mallocs":1,"Frees":0},{"Size":2816,"Mallocs":1,"Frees":0},{"Size":3072,"Mallocs":1,"Frees":0},{"Size":3328,"Mallocs":7,"Frees":0},{"Size":4096,"Mallocs":4,"Frees":0},{"Size":4608,"Mallocs":1,"Frees":0},{"Size":5376,"Mallocs":6,"Frees":0},{"Size":6144,"Mallocs":4,"Frees":0},{"Size":6400,"Mallocs":0,"Frees":0},{"Size":6656,"Mallocs":1,"Frees":0},{"Size":6912,"Mallocs":0,"Frees":0},{"Size":8192,"Mallocs":0,"Frees":0},{"Size":8448,"Mallocs":0,"Frees":0},{"Size":8704,"Mallocs":1,"Frees":0},{"Size":9472,"Mallocs":0,"Frees":0},{"Size":10496,"Mallocs":0,"Frees":0},{"Size":12288,"Mallocs":1,"Frees":0},{"Size":13568,"Mallocs":0,"Frees":0},{"Size":14080,"Mallocs":0,"Frees":0},{"Size":16384,"Mallocs":0,"Frees":0},{"Size":16640,"Mallocs":0,"Frees":0},{"Size":17664,"Mallocs":1,"Frees":0}]},
Expand Down
13 changes: 7 additions & 6 deletions info/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package info

// TraceWriterInfo represents statistics from the trace writer.
type TraceWriterInfo struct {
Payloads int64
Traces int64
Spans int64
Errors int64
Retries int64
Bytes int64
Payloads int64
Traces int64
Transactions int64
Spans int64
Errors int64
Retries int64
Bytes int64
}

// ServiceWriterInfo represents statistics from the service writer.
Expand Down
3 changes: 3 additions & 0 deletions writer/trace_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (w *TraceWriter) flush() {
}

atomic.AddInt64(&w.stats.Traces, int64(numTraces))
atomic.AddInt64(&w.stats.Transactions, int64(numTransactions))
atomic.AddInt64(&w.stats.Spans, int64(w.spansInBuffer))

tracePayload := model.TracePayload{
Expand Down Expand Up @@ -256,13 +257,15 @@ func (w *TraceWriter) updateInfo() {
// Load counters and reset them for the next flush
twInfo.Payloads = atomic.SwapInt64(&w.stats.Payloads, 0)
twInfo.Traces = atomic.SwapInt64(&w.stats.Traces, 0)
twInfo.Transactions = atomic.SwapInt64(&w.stats.Transactions, 0)
twInfo.Spans = atomic.SwapInt64(&w.stats.Spans, 0)
twInfo.Bytes = atomic.SwapInt64(&w.stats.Bytes, 0)
twInfo.Retries = atomic.SwapInt64(&w.stats.Retries, 0)
twInfo.Errors = atomic.SwapInt64(&w.stats.Errors, 0)

w.statsClient.Count("datadog.trace_agent.trace_writer.payloads", int64(twInfo.Payloads), nil, 1)
w.statsClient.Count("datadog.trace_agent.trace_writer.traces", int64(twInfo.Traces), nil, 1)
w.statsClient.Count("datadog.trace_agent.trace_writer.transactions", int64(twInfo.Transactions), nil, 1)
w.statsClient.Count("datadog.trace_agent.trace_writer.spans", int64(twInfo.Spans), nil, 1)
w.statsClient.Count("datadog.trace_agent.trace_writer.bytes", int64(twInfo.Bytes), nil, 1)
w.statsClient.Count("datadog.trace_agent.trace_writer.retries", int64(twInfo.Retries), nil, 1)
Expand Down

0 comments on commit 02c98a9

Please sign in to comment.