-
Notifications
You must be signed in to change notification settings - Fork 31
[sampling] add distributed tracing capabilities #310
Conversation
1a3a573
to
e85a2e6
Compare
13f326b
to
3ffd04c
Compare
agent/receiver_responses.go
Outdated
|
||
for k, v := range response.Rates { | ||
tags := strings.Split(k, ",") // could be "service:myapp,env:" | ||
statsd.Client.Gauge("datadog.trace_agent.sampling_rate", v, tags, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this metric? It will be high cardinality while not providing such a useful metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might only keep it for debugging (for that matter it's quite convenient, as you know exactly what kind of decision is done by the sampler on a per-service basis), but probably yes, the cardinality is dangerous in production use.
agent/sampler.go
Outdated
// NewSampler creates a new empty sampler ready to be started | ||
func NewSampler(conf *config.AgentConfig) *Sampler { | ||
engine := sampler.NewSampler(conf.ExtraSampleRate, conf.MaxTPS) | ||
engineType := strings.Replace(fmt.Sprint(reflect.TypeOf(engine))[1:], ".", "_", -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes things more complex and less obvious.
Why not simply making it explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think initially I was worried of "."
and "*"
not playing nice with JSON serialization. I now realise they should only be hash keys and not object attributes, so should be fine.
sampler/sampler.go
Outdated
if maxTPSrate < 1 { | ||
sampled = ApplySampleRate(root, maxTPSrate) | ||
// if maxTPSrate < 1 { | ||
if maxTPSrate < sampleRate { // [TODO:christian] double-check this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you change that? maxTPS and sampleRate are unrelated here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was not totally clear to me at the time. I probably need to write a test that is broken by this change (it does not exist now I think), they are indeed unrelated and calling twice ApplySampleRate does indeed cumulates the sampling rates and does not "override" the first call (probably why I got it wrong). Will revert that & write the corresponding test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, happy to dive more into this piece of the code to be sure we do the right thing.
maxTPSrate < 1
is still the right condition to check for figuring out if we need to apply the maxTPSrate to the incoming trace in the case of signature sampling.
sampler/sampler.go
Outdated
} | ||
|
||
// newGenericSampler returns an initialized Sampler, allowing to choose the signature computer and the sample rate applier. | ||
func newGenericSampler(extraRate float64, maxTPS float64, computer SignatureComputer, applier SampleRateApplier) *ScoreSampler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this attempt to get a "generic" sampler makes things much harder in general.
While in practice, we will never have arbitrary combination of computers/appliers.
I had a hard time to understand the current code flow and it is error prone.
There is one non-distributed SignatureSampler
and one distributed PrioritySampler
. These are quite independent.
Whatever code we want to share between these should simply be functions.
sampler/rate.go
Outdated
) | ||
|
||
const ( | ||
samplingPriorityKey = "sampling.priority" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agent/receiver.go
Outdated
fallthrough | ||
case v03: | ||
traceChan = r.traces | ||
case v04: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: https://github.com/DataDog/dd-trace-py/pull/325/files#r136864220
We still want to be able not to use the priority sampling and use the "old" signature sampling.
What should we do?
- we distinguish by endpoint version? v03 is for signature, v04 is for priority?
- v04 supports both, and we figure it which one to use by looking at the sent payload.
I'd vote for 2., that way the client will be simpler/won't have to figure out when protocol version to use.
But then, we have to figure out if we use the signature or the priority sampling. Possibilities: payload header, check if traces have a priority meta.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, got it, quite a change I think for we might end up setting explicit priority meta to 0. I'd be against putting something in the header, because it's as complex as opening a new endpoint, and it's far less explicit (and the protocol does change, v0.3 returns "OK" while v0.4 returns a JSON).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what would you suggest?
My concern is that we will keep coming up with new versions of the endpoint/protocol while we will still want to support both versions of the sampler.
So freezing the signature sampling to the v03 might be cumbersome for the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combination of both:
- change to v04 because the protocol actually changed (return value of
OK
hardcoded in v03 is indeed different from a valid JSON in v04) - read the actual content of traces to put them in the right pipeline, regardless of what enveloppe (version) they are coming from
sampler/combinedsignature.go
Outdated
} | ||
|
||
// ComputeSignatureWithRootAndEnv generates the signature of a trace knowing its root | ||
// Signature based on the hash of (env, service, name, resource, is_error) for the root, plus the set of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not to self: it is time to simplify this signature to only care about the root (env, service, name, resource, is_error)
agent/agent.go
Outdated
Concentrator: c, | ||
Filters: f, | ||
Sampler: s, | ||
DistributedSampler: ds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this "prioritySampler" instead.
agent/agent.go
Outdated
p.Traces = a.Sampler.Flush() | ||
p.Traces = append(p.Traces, a.DistributedSampler.Flush()...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, all traces in the same payload? How is the backend knowing which sampling logic to apply?
cc @MattHauglustaine https://github.com/DataDog/dd-go/pull/3741/files#diff-a82552013e7d8d23211f360de47ecef8R65
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh thats fine. I assumed the payload would contain either all new traces, or all old traces, but that would be an easy fix.
agent/agent.go
Outdated
// Process is the default work unit that receives a trace, transforms it and | ||
// passes it downstream | ||
func (a *Agent) Process(t model.Trace) { | ||
a.processWithSampler(t, a.Sampler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the test between signature vs priority sampler could be done here, by checking if the trace has a sampling priority?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed (but requires sending explicit sampling priority set to 0, which is not done yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we should have our clients either not reporting the priority at all (in Python, distributed_sampler set to None) or always reporting it (non None distributed_sampler).
agent/info.go
Outdated
func publishRateByService() interface{} { | ||
infoMu.RLock() | ||
rbs := infoRateByService | ||
infoMu.RUnlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply
infoMu.RLock()
defer infoMu.RUnlock()
return infoRateByService
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm generally speaking a strong advocate of using defer whenever possible but also remember us hunting after them lately ;) Will happily use that pattern anyway, no prob, thanks for suggesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember the trouble you had with it. But yeah, clearly not a big deal/doesn't really matter.
agent/receiver_responses.go
Outdated
@@ -46,3 +60,22 @@ func HTTPOK(w http.ResponseWriter) { | |||
w.WriteHeader(http.StatusOK) | |||
io.WriteString(w, "OK\n") | |||
} | |||
|
|||
// HTTPRateByService outputs, as a JSON, the recommended sampling rates for all services. | |||
func HTTPRateByService(w http.ResponseWriter, rates *sampler.RateByService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also make a endpoint only returning this? Helpful for debug?
I also haven't figured out yet what to do in the client before the first flush: we won't have the per-service rate. Maybe that could be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already exposed by expvar if you curl http://localhost:8126/debug/vars
(for debugging, the metric mentionned in the beginning of the PR can be helpful too)
b46b1fa
to
ae58a96
Compare
sampler/rate.go
Outdated
// asks for the sampling rate for such a trace, it gest this result | ||
// - use the information that was in the meta tags ("_sampling_priority_v1") to | ||
// decide wether this one should be sampled or not. | ||
func (csra *clientSampleRateApplier) ApplySampleRate(root *model.Span, sampleRate float64) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this logic here?
- for priority sampling, there is no extra sample rate to apply here. We only rely on what comes from the client.
- what we send to the client should include the maxTPS component.
d36544c
to
3fd4e81
Compare
@@ -322,6 +326,12 @@ APM_CONF: | |||
if v, e := conf.GetFloat("trace.sampler", "max_traces_per_second"); e == nil { | |||
c.MaxTPS = v | |||
} | |||
if v := strings.ToLower(conf.GetDefault("trace.sampler", "priority_sampling", "")); v == "yes" || v == "true" { | |||
c.PrioritySampling = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of making it an option? Isn't the check for the sampling meta enough?
Plus, why having it off by default? It would be extra friction, while the priority sampling is already off by default in the clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making it an option is convenient because it avoids being forced to restart your app to disable it within the client lib. But I might change the default anyway, indeed, to lower the barrier for using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, the option is fair and the true
default is good.
return fmt.Sprintf("traces received: %v, traces dropped: %v, traces filtered: %v, "+ | ||
"traces amount: %v bytes, services received: %v, services amount: %v bytes", | ||
tracesReceived, tracesDropped, tracesFiltered, tracesBytes, servicesReceived, servicesBytes) | ||
return fmt.Sprintf("traces received: %d, traces dropped: %d, traces filtered: %d, "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that used somewhere for logging? If yes, the line is getting long, we could:
- not repeat
traces
/services
- dropped/priority 0/high priority don't mean anything to users. We'd have to phrase it differently.
If it is just debug/internal, it's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's indeed used in logs, I backed to the previous version which does not display the priority info.
sampler/prioritysampler.go
Outdated
|
||
// Check for the maxTPS limit, and if we require an extra sampling. | ||
// No need to check if we already decided not to keep the trace. | ||
maxTPSrate := s.Sampler.GetMaxTPSSampleRate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this 2 lines? Looks like lines to be removed?
As with the priority sampling, maxTPS
only drives the internal mechanics of the per-service sampling computation, and once the client made the decision we stick to it.
sampler/prioritysampler.go
Outdated
s.Sampler.Stop() | ||
} | ||
|
||
func updateSampleRateForPriority(root *model.Span, sampleRate, defaultSampleRate float64, rates *RateByService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would deserve a nice docstring, telling that this only updates the rates
, so that the receiver is up-to-date.
sampler/prioritysampler.go
Outdated
func updateSampleRateForPriority(root *model.Span, sampleRate, defaultSampleRate float64, rates *RateByService) { | ||
initialRate := GetTraceAppliedSampleRate(root) | ||
newRate := initialRate * sampleRate | ||
SetTraceAppliedSampleRate(root, newRate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is incorrect to chain the sample rates (pick the previous / multiply / store the new one) for priority sampling.
It might even badly affect server-side sampling as it would give a wrong information (this new rate isn't even the actual rate applied by the client + agent).
I'm not sure it is clear enough, ping me for me details.
You can drop these 3 lines.
sampler/prioritysampler.go
Outdated
if root.Meta != nil { | ||
env = root.Meta["env"] | ||
} | ||
rates.Set(root.Service, env, newRate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating rates
only at that moment isn't enough, since it doesn't cover:
- Over time, per-service rates can change (because of the decay). Here we don't take it into consideration and we may provide an "old rate" (which would be too low).
- When
AdjustScoring
runs, all coefficients can be adjusted and so are resulting rates.
const ( | ||
processStatsInterval = time.Minute | ||
languageHeaderKey = "X-Datadog-Reported-Languages" | ||
samplingPriorityKey = "_sampling_priority_v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only for v1, private and for internal usage until we have OT support, with the final sampling.priority
tag. Also, this is a metrics for now.
} | ||
} | ||
} | ||
atomic.AddInt64(priorityPtr, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here this looks redundant with received traces, but not quite, actually traces can be dropped for several reasons , some because they were not normalisable (this would be done before) some because they are outdated (this would be done after). All in all, having this metric (which is then exposed with tags depending on wether priority is none, 0, or non-zero) is very useful to inspect what is going on.
http.HandleFunc("/v0.3/traces", r.httpHandleWithVersion(v03, r.handleTraces)) | ||
http.HandleFunc("/v0.3/services", r.httpHandleWithVersion(v03, r.handleServices)) | ||
|
||
// current collector API | ||
http.HandleFunc("/v0.4/traces", r.httpHandleWithVersion(v04, r.handleTraces)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new endpoint has the same input as /v0.3
, only /v0.4/traces
returns a JSON with the sampling rates for all services/envs.
return fmt.Sprintf("traces received: %v, traces dropped: %v, traces filtered: %v, "+ | ||
"traces amount: %v bytes, services received: %v, services amount: %v bytes", | ||
tracesReceived, tracesDropped, tracesFiltered, tracesBytes, servicesReceived, servicesBytes) | ||
return fmt.Sprintf("traces received: %d, traces dropped: %d, traces filtered: %d, "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's indeed used in logs, I backed to the previous version which does not display the priority info.
|
||
for period := 0; period < initPeriods+periods; period++ { | ||
s.Sampler.Backend.DecayScore() | ||
s.Sampler.AdjustScoring() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This really needs to be called, and at some point, having it running now and then gives some fuzziness to the results (so, the tolerance interval is slightly larger than in the previous sampler)
sampler/ratebyservice.go
Outdated
type RateByService struct { | ||
timeout time.Duration | ||
rates map[string]timeoutValue | ||
mutex sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is here to allow exposition of the rate while updating it on the fly.
sampler/score.go
Outdated
@@ -21,23 +21,38 @@ func SampleByRate(traceID uint64, sampleRate float64) bool { | |||
return true | |||
} | |||
|
|||
func capTo1(f float64) float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dummy function to avoid copy/pasting
agent/info_test.go
Outdated
------------------------------ | ||
|
||
Rate for 'service:myapp,env:dev': 12.3 % |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this Sample rate
, as it is user exposed that's more understandable.
agent/receiver.go
Outdated
@@ -62,12 +67,13 @@ type HTTPReceiver struct { | |||
} | |||
|
|||
// NewHTTPReceiver returns a pointer to a new HTTPReceiver | |||
func NewHTTPReceiver(conf *config.AgentConfig) *HTTPReceiver { | |||
func NewHTTPReceiver(conf *config.AgentConfig, rates *sampler.RateByService) *HTTPReceiver { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like a non-sustainable change of the API: the more we will return things to the Agent, the more complex this will get.
What about making rates
a part of conf
? That should also simplify the way it flows around the codebase?
Or if we are worried to mix static with mutable config, maybe have a dynamicConfig
object?
It bothers me even more that you have to change test code for v01/02/03 endpoints because of the v04 endpoint change.
Feel free to discard this comment, as this might be early over-engineering. But I guess we could at least leave as a comment somewhere that we should put this rates
in a larger struct at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, just introduced a DynamicConfig
object. Indeed, with it, the receiver constructor is less bounded to current impl, it only has to know "there's something dynamic".
sampler/ratebyservice.go
Outdated
if rbs.rates == nil { | ||
rbs.rates = make(map[string]timeoutValue, 1) | ||
} | ||
timeout := rbs.timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the need for a timeout at this level of the interface.
It makes the logic more complex, while the only thing we'd expect from RateByService
is to expose the content of the per-service PrioritySampler.
So I'd expect the expiration to happen inside the PrioritySampler (and I think it already does).
It it would have to have a new method to directly set a RateByService.rates
to perfectly match the internal of the PrioritySampler.
e620406
to
75bd88e
Compare
[distributed sampling] instroduced service sampler [distributed sampling] added new channel to handle distributed traces [distributed tracing] fixed stats status [distributing sampling] test typo [distributed sampling] added JSON output with rate by service [distributed sampling] encapsulating response in a container [distributed sampling] handling distributed traces for real, sending feedback to client lib [distributed sampling] trusting client library info in by service sampler [distributed sampling] fixed agent tests [distributed sampling] adding metrics and visual output about internal sampling rates [distributed sampling] added sample rate feedback in -info command [distributed sampling] added test for ApplyRate funcs [distributed sampling] added test for service signature [distributed sampling] replaced sampling.priority by _sampling_priority_v1 As this is still experimental, replacing sampling.priority by _sampling_priority_v1, and also converting it from a meta to a metric. This makes code easier to handle and saves many string to number conversion, and additionnally, the _sampling_priority_v1 metric is naturally trimmed from front-end UI as it's: - a) a metric - b) it starts by "_" [distributed sampling] removed debug metric [distributed sampling] using genuine engine type as key in info hash [distributed samping] fixed sampling on services (algo and tests) [distributed tracing] DistributedSampler -> PrioritySampler [distributed sampling] removing generic constructor [distributed sampling] refactored samplers to share common code without hooks [distributed sampling] using defers in info publishing code [distributed sampling] choosing sampler depending on trace content [distributed sampling] updated receiver tests [distributed sampling] made sampler internal public again [distributed sampling] fixed expvar to avoid dashboard migration [distributed tracing] added metrics to follow sampling priority on receiver [distributed sampling] returning a meaningful value for the default sample rate [distributed sampling] returning something else than 1 for default sampling value [distributed sampling] removed debug message [distributed sampling] enabling priority sampling by default [distributed sampling] removing useless sample rate metric when priority sampling [distributed sampling] removed pointless code [distributed sampling] added docstring [distributed sampling] stats rework (using tags for sampling priority) [distributed sampling] typo fixes [distributed sampling] refined '-info' output This patch: - shows "Sample Rate" instead of "Rate" - removes the default sample rate which is a fallback and should not be used in most cases [distributed sampling] moved rate by service to config package in dynamic config [distributed sampling] using dynamic config to report rate by service [distributed sampling] refactored and rewrote tests
75bd88e
to
01d72f5
Compare
config/agent.go
Outdated
PreSampleRate float64 | ||
MaxTPS float64 | ||
PrioritySampling bool | ||
PrioritySamplerTimeout time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer used, let's fully drop it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just need to remove PrioritySamplerTimeout
and we are all good.
No description provided.