Skip to content

Commit

Permalink
fix: record sample rate in kept record during stress relief
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ committed Aug 14, 2024
1 parent 1aad5b2 commit 057badd
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
9 changes: 5 additions & 4 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
timeout = 60 * time.Second
}

now := time.Now()
now := i.Clock.Now()
trace = &types.Trace{
APIHost: sp.APIHost,
APIKey: sp.APIKey,
Expand Down Expand Up @@ -528,7 +528,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// great! trace is live. add the span.
trace.AddSpan(sp)

markTraceForSending := false
var markTraceForSending bool
// if the span count has exceeded our SpanLimit, mark the trace for sending
if tcfg.SpanLimit > 0 && uint(trace.SpanCount()) > tcfg.SpanLimit {
markTraceForSending = true
Expand All @@ -547,7 +547,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
timeout = 2 * time.Second
}

trace.SendBy = time.Now().Add(timeout)
trace.SendBy = i.Clock.Now().Add(timeout)
}
}

Expand Down Expand Up @@ -583,6 +583,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool,
ArrivalTime: now,
SendBy: now,
}
trace.SetSampleRate(rate)
// we do want a record of how we disposed of traces in case more come in after we've
// turned off stress relief (if stress relief is on we'll keep making the same decisions)
i.sampleTraceCache.Record(trace, keep, reason)
Expand Down Expand Up @@ -733,7 +734,7 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
}
trace.Sent = true

traceDur := time.Since(trace.ArrivalTime)
traceDur := i.Clock.Since(trace.ArrivalTime)
i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds()))
i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount()))
if trace.RootSpan != nil {
Expand Down
79 changes: 79 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,85 @@ func TestAddAdditionalAttributes(t *testing.T) {

}

func TestStressReliefSampleRate(t *testing.T) {
conf := &config.MockConfig{
GetTracesConfigVal: config.TracesConfig{
SendTicker: config.Duration(2 * time.Millisecond),
SendDelay: config.Duration(1 * time.Millisecond),
TraceTimeout: config.Duration(5 * time.Minute),
MaxBatchSize: 500,
},
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetCollectionConfigVal: config.CollectionConfig{
ShutdownDelay: config.Duration(1 * time.Millisecond),
},
}

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)

stc, err := newCache()
assert.NoError(t, err, "lru cache should start")
coll.sampleTraceCache = stc

var traceID = "traceABC"

span := &types.Span{
TraceID: traceID,
Event: types.Event{
Dataset: "aoeu",
Data: map[string]interface{}{
"trace.parent_id": "unused",
},
APIKey: legacyAPIKey,
},
}
coll.StressRelief = &MockStressReliever{
IsStressed: true,
SampleDeterministically: true,
ShouldKeep: true,
SampleRate: 100,
}
processed, kept := coll.ProcessSpanImmediately(span)
require.True(t, processed)
require.True(t, kept)

tr, _, found := coll.sampleTraceCache.CheckTrace(traceID)
require.True(t, found)
require.NotNil(t, tr)
assert.Equal(t, uint(100), tr.Rate())

transmission.Mux.RLock()
assert.Equal(t, 1, len(transmission.Events), "span should immediately be sent during stress relief")
assert.Equal(t, uint(100), transmission.Events[0].SampleRate)
transmission.Mux.RUnlock()

rootSpan := &types.Span{
TraceID: traceID,
Event: types.Event{
Dataset: "aoeu",
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
SampleRate: 10,
},
}

processed2, kept2 := coll.ProcessSpanImmediately(rootSpan)
require.True(t, processed2)
require.True(t, kept2)

tr2, _, found2 := coll.sampleTraceCache.CheckTrace(traceID)
require.True(t, found2)
require.NotNil(t, tr2)
assert.Equal(t, uint(100), tr2.Rate())
transmission.Mux.RLock()
assert.Equal(t, 2, len(transmission.Events), "span should immediately be sent during stress relief")
assert.Equal(t, uint(1000), transmission.Events[1].SampleRate)
transmission.Mux.RUnlock()
}

// TestStressReliefDecorateHostname tests that the span gets decorated with hostname if
// StressReliefMode is active
func TestStressReliefDecorateHostname(t *testing.T) {
Expand Down

0 comments on commit 057badd

Please sign in to comment.