Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Introduce new writers to API v0.2 #343

Merged
merged 9 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ desc 'Bootstrap CI environment'
task :bootstrap do
tools = {
'github.com/golang/lint' => {
# TODO: upgrade golint
version: 'b8599f7d71e7fead76b25aeb919c0e2558672f4a',
main_pkg: './golint',
check_cmd: 'golint',
Expand Down Expand Up @@ -68,6 +69,8 @@ PACKAGES = %w(
EXCLUDE_LINT = [
'model/services_gen.go',
'model/trace_gen.go',
'model/trace.pb.go',
'model/trace_payload.pb.go',
'model/span_gen.go',
'model/span.pb.go',
]
Expand Down
125 changes: 60 additions & 65 deletions agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"sync"
"sync/atomic"
"time"

Expand All @@ -19,7 +18,6 @@ import (

const (
processStatsInterval = time.Minute
languageHeaderKey = "X-Datadog-Reported-Languages"
samplingPriorityKey = "_sampling_priority_v1"
)

Expand All @@ -40,12 +38,14 @@ func (pt *processedTrace) weight() float64 {

// Agent struct holds all the sub-routines structs and make the data flow between them
type Agent struct {
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
ScoreEngine *Sampler
PriorityEngine *Sampler
Writer *writer.Writer
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
ScoreSampler *Sampler
PrioritySampler *Sampler
TraceWriter *writer.TraceWriter
ServiceWriter *writer.ServiceWriter
StatsWriter *writer.StatsWriter

// config
conf *config.AgentConfig
Expand All @@ -60,41 +60,56 @@ type Agent struct {
// NewAgent returns a new Agent object, ready to be started
func NewAgent(conf *config.AgentConfig, exit chan struct{}) *Agent {
dynConf := config.NewDynamicConfig()
r := NewHTTPReceiver(conf, dynConf)

// inter-component channels
rawTraceChan := make(chan model.Trace, 5000) // about 1000 traces/sec for 5 sec, TODO: move to *model.Trace
sampledTraceChan := make(chan *model.Trace)
statsChan := make(chan []model.StatsBucket)
serviceChan := make(chan model.ServicesMetadata, 50)

// create components
r := NewHTTPReceiver(conf, dynConf, rawTraceChan, serviceChan)
c := NewConcentrator(
conf.ExtraAggregators,
conf.BucketInterval.Nanoseconds(),
statsChan,
)
f := filters.Setup(conf)
ss := NewScoreEngine(conf)

ss := NewScoreSampler(conf, sampledTraceChan)
var ps *Sampler
if conf.PrioritySampling {
// Use priority sampling for distributed tracing only if conf says so
ps = NewPriorityEngine(conf, dynConf)
// TODO: remove the option once comfortable ; as it is true by default.
ps = NewPrioritySampler(conf, dynConf, sampledTraceChan)
}
tw := writer.NewTraceWriter(conf, sampledTraceChan)
sw := writer.NewStatsWriter(conf, statsChan)
svcW := writer.NewServiceWriter(conf, serviceChan)

w := writer.NewWriter(conf)
w.InServices = r.services
// wire components together
tw.InTraces = sampledTraceChan
sw.InStats = statsChan
svcW.InServices = serviceChan

return &Agent{
Receiver: r,
Concentrator: c,
Filters: f,
ScoreEngine: ss,
PriorityEngine: ps,
Writer: w,
conf: conf,
dynConf: dynConf,
exit: exit,
die: die,
Receiver: r,
Concentrator: c,
Filters: f,
ScoreSampler: ss,
PrioritySampler: ps,
TraceWriter: tw,
StatsWriter: sw,
ServiceWriter: svcW,
conf: conf,
dynConf: dynConf,
exit: exit,
die: die,
}
}

// Run starts routers routines and individual pieces then stop them when the exit order is received
func (a *Agent) Run() {
flushTicker := time.NewTicker(a.conf.BucketInterval)
defer flushTicker.Stop()

// it's really important to use a ticker for this, and with a not too short
// interval, for this is our garantee that the process won't start and kill
// itself too fast (nightmare loop)
Expand All @@ -104,54 +119,34 @@ func (a *Agent) Run() {
// update the data served by expvar so that we don't expose a 0 sample rate
info.UpdatePreSampler(*a.Receiver.preSampler.Stats())

// TODO: unify components APIs. Use Start/Stop as non-blocking ways of controlling the blocking Run loop.
// Like we do with TraceWriter.
a.Receiver.Run()
a.Writer.Run()
a.ScoreEngine.Run()
if a.PriorityEngine != nil {
a.PriorityEngine.Run()
a.TraceWriter.Start()
a.StatsWriter.Start()
a.ServiceWriter.Start()
a.Concentrator.Start()
a.ScoreSampler.Run()
if a.PrioritySampler != nil {
a.PrioritySampler.Run()
}

for {
select {
case t := <-a.Receiver.traces:
a.Process(t)
case <-flushTicker.C:
p := model.AgentPayload{
HostName: a.conf.HostName,
Env: a.conf.DefaultEnv,
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer watchdog.LogOnPanic()
p.Stats = a.Concentrator.Flush()
wg.Done()
}()
go func() {
defer watchdog.LogOnPanic()
// Serializing both flushes, classic agent sampler and distributed sampler,
// in most cases only one will be used, so in mainstream case there should
// be no performance issue, only in transitionnal mode can both contain data.
p.Traces = a.ScoreEngine.Flush()
if a.PriorityEngine != nil {
p.Traces = append(p.Traces, a.PriorityEngine.Flush()...)
}
wg.Done()
}()

wg.Wait()
p.SetExtra(languageHeaderKey, a.Receiver.Languages())

a.Writer.InPayloads <- p
case <-watchdogTicker.C:
a.watchdog()
case <-a.exit:
log.Info("exiting")
close(a.Receiver.exit)
a.Writer.Stop()
a.ScoreEngine.Stop()
if a.PriorityEngine != nil {
a.PriorityEngine.Stop()
a.Concentrator.Stop()
a.TraceWriter.Stop()
a.StatsWriter.Stop()
a.ServiceWriter.Stop()
a.ScoreSampler.Stop()
if a.PrioritySampler != nil {
a.PrioritySampler.Stop()
}
return
}
Expand All @@ -177,11 +172,11 @@ func (a *Agent) Process(t model.Trace) {
// We choose the sampler dynamically, depending on trace content,
// it has a sampling priority info (wether 0 or 1 or more) we respect
// this by using priority sampler. Else, use default score sampler.
s := a.ScoreEngine
s := a.ScoreSampler
priorityPtr := &ts.TracesPriorityNone
if a.PriorityEngine != nil {
if a.PrioritySampler != nil {
if priority, ok := root.Metrics[samplingPriorityKey]; ok {
s = a.PriorityEngine
s = a.PrioritySampler

if priority == 0 {
priorityPtr = &ts.TracesPriority0
Expand Down
56 changes: 54 additions & 2 deletions agent/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,88 @@ package main
import (
"sort"
"sync"
"time"

log "github.com/cihub/seelog"

"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/statsd"
"github.com/DataDog/datadog-trace-agent/watchdog"
)

// Concentrator produces time bucketed statistics from a stream of raw traces.
// https://en.wikipedia.org/wiki/Knelson_concentrator
// Gets an imperial shitton of traces, and outputs pre-computed data structures
// allowing to find the gold (stats) amongst the traces.
type Concentrator struct {
// list of attributes to use for extra aggregation
aggregators []string
bsize int64
// bucket duration in nanoseconds
bsize int64

OutStats chan []model.StatsBucket

exit chan struct{}
exitWG *sync.WaitGroup

buckets map[int64]*model.StatsRawBucket // buckets used to aggregate stats per timestamp
mu sync.Mutex
}

// NewConcentrator initializes a new concentrator ready to be started
func NewConcentrator(aggregators []string, bsize int64) *Concentrator {
func NewConcentrator(aggregators []string, bsize int64, out chan []model.StatsBucket) *Concentrator {
c := Concentrator{
aggregators: aggregators,
bsize: bsize,
buckets: make(map[int64]*model.StatsRawBucket),

OutStats: out,

exit: make(chan struct{}),
exitWG: &sync.WaitGroup{},
}
sort.Strings(c.aggregators)
return &c
}

// Start starts the concentrator.
func (c *Concentrator) Start() {
go func() {
defer watchdog.LogOnPanic()
c.Run()
}()
}

// Run runs the main loop of the concentrator goroutine. Traces are received
// through `Add`, this loop only deals with flushing.
func (c *Concentrator) Run() {
c.exitWG.Add(1)
defer c.exitWG.Done()

// flush with the same period as stats buckets
flushTicker := time.NewTicker(time.Duration(c.bsize) * time.Nanosecond)
defer flushTicker.Stop()

log.Debug("starting concentrator")

for {
select {
case <-flushTicker.C:
c.OutStats <- c.Flush()
case <-c.exit:
log.Info("exiting concentrator, computing remaining stats")
c.OutStats <- c.Flush()
return
}
}
}

// Stop stops the main Run loop.
func (c *Concentrator) Stop() {
close(c.exit)
c.exitWG.Wait()
}

// Add appends to the proper stats bucket this trace's statistics
func (c *Concentrator) Add(t processedTrace) {
c.mu.Lock()
Expand Down
6 changes: 4 additions & 2 deletions agent/concentrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
var testBucketInterval = time.Duration(2 * time.Second).Nanoseconds()

func NewTestConcentrator() *Concentrator {
return NewConcentrator([]string{}, time.Second.Nanoseconds())
statsChan := make(chan []model.StatsBucket)
return NewConcentrator([]string{}, time.Second.Nanoseconds(), statsChan)
}

// getTsInBucket gives a timestamp in ns which is `offset` buckets late
Expand All @@ -39,7 +40,8 @@ func testSpan(c *Concentrator, spanID uint64, duration, offset int64, service, r

func TestConcentratorStatsCounts(t *testing.T) {
assert := assert.New(t)
c := NewConcentrator([]string{}, testBucketInterval)
statsChan := make(chan []model.StatsBucket)
c := NewConcentrator([]string{}, testBucketInterval, statsChan)

now := model.Now()
alignedNow := now - now%c.bsize
Expand Down
2 changes: 1 addition & 1 deletion agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func runAgent(exit chan struct{}) {

if opts.info {
if err := info.Info(os.Stdout, agentConf); err != nil {
// need not display the error, Info should do it already
os.Stdout.WriteString(fmt.Sprintf("failed to print info: %s\n", err))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less "should" more action 👍 😄

os.Exit(1)
}
return
Expand Down
9 changes: 6 additions & 3 deletions agent/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,20 @@ type HTTPReceiver struct {
}

// NewHTTPReceiver returns a pointer to a new HTTPReceiver
func NewHTTPReceiver(conf *config.AgentConfig, dynConf *config.DynamicConfig) *HTTPReceiver {
func NewHTTPReceiver(
conf *config.AgentConfig, dynConf *config.DynamicConfig, traces chan model.Trace, services chan model.ServicesMetadata,
) *HTTPReceiver {
// use buffered channels so that handlers are not waiting on downstream processing
return &HTTPReceiver{
traces: make(chan model.Trace, 5000), // about 1000 traces/sec for 5 sec
services: make(chan model.ServicesMetadata, 50),
conf: conf,
dynConf: dynConf,
stats: info.NewReceiverStats(),
preSampler: sampler.NewPreSampler(conf.PreSampleRate),
exit: make(chan struct{}),

traces: traces,
services: services,

maxRequestBodyLength: maxRequestBodyLength,
debug: strings.ToLower(conf.LogLevel) == "debug",
}
Expand Down
Loading