Skip to content

Commit

Permalink
avoid setting ingestion_reason:probabilistic always for OTLP error spans
Browse files Browse the repository at this point in the history
  • Loading branch information
keisku committed Jan 31, 2025
1 parent 91794f6 commit 8974346
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 54 deletions.
22 changes: 18 additions & 4 deletions pkg/trace/api/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,15 +505,29 @@ func (o *OTLPReceiver) createChunks(tracesByID map[uint64]pb.Trace, prioritiesBy
Tags: map[string]string{"_dd.otlp_sr": rate},
Spans: spans,
}
if o.conf.ProbabilisticSamplerEnabled {
chunk.Priority = int32(sampler.PriorityNone)
// Skip the probabilistic sampler of OTLPReceiver.
// Do not set `_dd.p.dm` because Probability Sampler enabled by this config or Error Sampler will be a decision maker.
traceChunks = append(traceChunks, chunk)
continue
}
var samplingPriorty sampler.SamplingPriority
var decisionMaker string
if p, ok := prioritiesByID[k]; ok {
// a manual decision has been made by the user
chunk.Priority = int32(p)
traceutil.SetMeta(spans[0], "_dd.p.dm", "-4")
samplingPriorty = p
decisionMaker = "-4"
} else {
// we use the probabilistic sampler to decide
chunk.Priority = int32(o.sample(k))
traceutil.SetMeta(spans[0], "_dd.p.dm", "-9")
samplingPriorty = o.sample(k)
decisionMaker = "-9"
}
// Do not set `_dd.p.dm` when sampling priorty is drop because Error Sampler will be a decision maker for the dropped traces.
if samplingPriorty.IsKeep() {
traceutil.SetMeta(spans[0], "_dd.p.dm", decisionMaker)
}
chunk.Priority = int32(samplingPriorty)
traceChunks = append(traceChunks, chunk)
}
return traceChunks
Expand Down
125 changes: 76 additions & 49 deletions pkg/trace/api/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,58 +697,85 @@ func testOTLPSpanNameV2(enableReceiveResourceSpansV2 bool, t *testing.T) {
}

func TestCreateChunks(t *testing.T) {
t.Run("ReceiveResourceSpansV1", func(t *testing.T) {
testCreateChunk(false, t)
})

t.Run("ReceiveResourceSpansV2", func(t *testing.T) {
testCreateChunk(true, t)
})
}

func testCreateChunk(enableReceiveResourceSpansV2 bool, t *testing.T) {
cfg := NewTestConfig(t)
if !enableReceiveResourceSpansV2 {
cfg.Features["disable_receive_resource_spans_v2"] = struct{}{}
}
cfg.OTLPReceiver.ProbabilisticSampling = 50
o := NewOTLPReceiver(nil, cfg, &statsd.NoOpClient{}, &timing.NoopReporter{})
const (
traceID1 = 123 // sampled by 50% rate
traceID2 = 1237892138897 // not sampled by 50% rate
traceID3 = 1237892138898 // not sampled by 50% rate
)
traces := map[uint64]pb.Trace{
traceID1: {{TraceID: traceID1, SpanID: 1}, {TraceID: traceID1, SpanID: 2}},
traceID2: {{TraceID: traceID2, SpanID: 1}, {TraceID: traceID2, SpanID: 2}},
traceID3: {{TraceID: traceID3, SpanID: 1}, {TraceID: traceID3, SpanID: 2}},
}
priorities := map[uint64]sampler.SamplingPriority{
traceID3: sampler.PriorityUserKeep,
tests := []struct {
enableReceiveResourceSpansV2 bool
probabilisticSamplerEnabled bool
}{
{
enableReceiveResourceSpansV2: false,
probabilisticSamplerEnabled: false,
},
{
enableReceiveResourceSpansV2: true,
probabilisticSamplerEnabled: false,
},
{
enableReceiveResourceSpansV2: false,
probabilisticSamplerEnabled: true,
},
{
enableReceiveResourceSpansV2: true,
probabilisticSamplerEnabled: true,
},
}
chunks := o.createChunks(traces, priorities)
var found int
for _, c := range chunks {
id := c.Spans[0].TraceID
require.ElementsMatch(t, c.Spans, traces[id])
require.Equal(t, "0.50", c.Tags["_dd.otlp_sr"])
switch id {
case traceID1:
//nolint:revive // TODO(OTEL) Fix revive linter
found += 1
require.Equal(t, "-9", c.Spans[0].Meta["_dd.p.dm"])
require.Equal(t, int32(1), c.Priority)
case traceID2:
found += 2
require.Equal(t, "-9", c.Spans[0].Meta["_dd.p.dm"])
require.Equal(t, int32(0), c.Priority)
case traceID3:
found += 3
require.Equal(t, "-4", c.Spans[0].Meta["_dd.p.dm"])
require.Equal(t, int32(2), c.Priority)
for _, tt := range tests {
var names []string
if tt.enableReceiveResourceSpansV2 {
names = append(names, "ReceiveResourceSpansV2")
} else {
names = append(names, "ReceiveResourceSpansV1")
}
if tt.probabilisticSamplerEnabled {
names = append(names, "ProbabilisticSamplerEnabled")
} else {
names = append(names, "ProbabilisticSamplerDisabled")
}
t.Run(strings.Join(names, " "), func(t *testing.T) {
cfg := NewTestConfig(t)
if !tt.enableReceiveResourceSpansV2 {
cfg.Features["disable_receive_resource_spans_v2"] = struct{}{}
}
cfg.OTLPReceiver.ProbabilisticSampling = 50
cfg.ProbabilisticSamplerEnabled = tt.probabilisticSamplerEnabled
o := NewOTLPReceiver(nil, cfg, &statsd.NoOpClient{}, &timing.NoopReporter{})
const (
traceID1 = 123 // sampled by 50% rate
traceID2 = 1237892138897 // not sampled by 50% rate
traceID3 = 1237892138898 // not sampled by 50% rate
)
traces := map[uint64]pb.Trace{
traceID1: {{TraceID: traceID1, SpanID: 1}, {TraceID: traceID1, SpanID: 2}},
traceID2: {{TraceID: traceID2, SpanID: 1}, {TraceID: traceID2, SpanID: 2}},
traceID3: {{TraceID: traceID3, SpanID: 1}, {TraceID: traceID3, SpanID: 2}},
}
priorities := map[uint64]sampler.SamplingPriority{
traceID3: sampler.PriorityUserKeep,
}
chunks := o.createChunks(traces, priorities)
require.Len(t, chunks, len(traces))
for _, c := range chunks {
if tt.probabilisticSamplerEnabled {
require.Emptyf(t, c.Spans[0].Meta["_dd.p.dm"], "decision maker must be empty")
require.Equalf(t, int32(sampler.PriorityNone), c.Priority, "priority must be none")
} else {
id := c.Spans[0].TraceID
require.ElementsMatch(t, c.Spans, traces[id])
require.Equal(t, "0.50", c.Tags["_dd.otlp_sr"])
switch id {
case traceID1:
require.Equal(t, "-9", c.Spans[0].Meta["_dd.p.dm"], "traceID1: dm must be -9")
require.Equal(t, int32(1), c.Priority, "traceID1: priority must be 1")
case traceID2:
require.Empty(t, c.Spans[0].Meta["_dd.p.dm"], "traceID2: dm must be empty")
require.Equal(t, int32(0), c.Priority, "traceID2: priority must be 0")
case traceID3:
require.Equal(t, "-4", c.Spans[0].Meta["_dd.p.dm"], "traceID3: dm must be -4")
require.Equal(t, int32(2), c.Priority, "traceID3: priority must be 2")
}
}
}
})
}
require.Equal(t, 6, found)
}

func TestOTLPReceiveResourceSpans(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/trace/sampler/prioritysampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *PrioritySampler) Sample(now time.Time, trace *pb.TraceChunk, root *pb.S
// Regardless of rates, sampling here is based on the metadata set
// by the client library. Which, is turn, is based on agent hints,
// but the rule of thumb is: respect client choice.
sampled := samplingPriority > 0
sampled := samplingPriority.IsKeep()

serviceSignature := ServiceSignature{Name: root.Service, Env: toSamplerEnv(tracerEnv, s.agentEnv)}
s.sampler.metrics.record(sampled, newMetricsKey(serviceSignature.Name, serviceSignature.Env, &samplingPriority))
Expand Down
5 changes: 5 additions & 0 deletions pkg/trace/sampler/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ const (
samplerHasher = uint64(1111111111111111111)
)

// IsKeep returns whether the priority is "keep".
func (s SamplingPriority) IsKeep() bool {
return s == PriorityAutoKeep || s == PriorityUserKeep
}

func (s SamplingPriority) tagValue() string {
switch s {
case PriorityUserDrop:
Expand Down

0 comments on commit 8974346

Please sign in to comment.