Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deal with orphan traces in trace cache #1405

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions collect/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type Cache interface {
// Set adds the trace to the cache. If it is kicking out a trace from the cache
// that has not yet been sent, it will return that trace. Otherwise returns nil.
Set(trace *types.Trace) *types.Trace
Set(trace *types.Trace)
Get(traceID string) *types.Trace

// GetAll is used during shutdown to get all in-flight traces to flush them
Expand Down Expand Up @@ -90,18 +90,18 @@ func (d *DefaultInMemCache) GetCacheEntryCount() int {
return len(d.cache)
}

func (d *DefaultInMemCache) Set(trace *types.Trace) *types.Trace {
func (d *DefaultInMemCache) Set(trace *types.Trace) {
// we need to dereference the trace ID so skip bad inserts to avoid panic
if trace == nil {
return nil
return
}
start := time.Now()

defer d.Metrics.Histogram("trace_cache_set_dur_ms", time.Since(start).Milliseconds())
defer d.Metrics.Histogram("trace_cache_set_dur_ms", time.Since(start).Microseconds()/1000.0)
// update the cache and priority queue
d.cache[trace.TraceID] = trace
d.pq.Set(trace.TraceID, trace.SendBy)
return nil
return
}

func (d *DefaultInMemCache) Get(traceID string) *types.Trace {
Expand All @@ -113,7 +113,7 @@ func (d *DefaultInMemCache) Get(traceID string) *types.Trace {
func (d *DefaultInMemCache) GetAll() []*types.Trace {
start := time.Now()

defer d.Metrics.Histogram("trace_cache_get_all_dur_ms", time.Since(start).Milliseconds())
defer d.Metrics.Histogram("trace_cache_get_all_dur_ms", time.Since(start).Microseconds()/1000.0)
return maps.Values(d.cache)
}

Expand All @@ -128,7 +128,7 @@ func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time, max int, filter fun
d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache)))

start := time.Now()
defer d.Metrics.Histogram("trace_cache_take_expired_traces_dur_ms", time.Since(start).Milliseconds())
defer d.Metrics.Histogram("trace_cache_take_expired_traces_dur_ms", time.Since(start).Microseconds()/1000.0)

var expired, skipped []*types.Trace
for !d.pq.IsEmpty() && (max <= 0 || len(expired) < max) {
Expand Down Expand Up @@ -174,7 +174,7 @@ func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time, max int, filter fun
func (d *DefaultInMemCache) RemoveTraces(toDelete generics.Set[string]) {
d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache)))
start := time.Now()
defer d.Metrics.Histogram("trace_cache_remove_traces_dur_ms", time.Since(start).Milliseconds())
defer d.Metrics.Histogram("trace_cache_remove_traces_dur_ms", time.Since(start).Microseconds()/1000.0)

for _, traceID := range toDelete.Members() {
delete(d.cache, traceID)
Expand Down
76 changes: 49 additions & 27 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ var inMemCollectorMetrics = []metrics.Metadata{
{Name: "collector_collect_loop_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "duration of the collect loop, the primary event processing goroutine"},
{Name: "collector_outgoing_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of traces waiting to be send to upstream"},
{Name: "collector_drop_decision_batch_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of drop decisions sent in a batch"},
{Name: "collector_expired_traces_missing_decisions", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of decision spans forwarded for expired traces missing trace decision"},
{Name: "collector_expired_traces_orphans", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of expired traces missing trace decision when they are sent"},
}

func (i *InMemCollector) Start() error {
Expand Down Expand Up @@ -291,6 +293,13 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) {
tracesSent := generics.NewSet[string]()
// Send the traces we can't keep.
for _, trace := range allTraces {
if !i.IsMyTrace(trace.ID()) {
i.Logger.Debug().WithFields(map[string]interface{}{
"trace_id": trace.ID(),
}).Logf("cannot eject trace that does not belong to this peer")

continue
}
td, err := i.makeDecision(trace, TraceSendEjectedMemsize)
if err != nil {
continue
Expand Down Expand Up @@ -560,11 +569,36 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.
defer span.End()

startTime := time.Now()
expiredTraces := make([]*types.Trace, 0)
traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout()
var orphanTraceCount int
traces := i.cache.TakeExpiredTraces(now, int(i.Config.GetTracesConfig().MaxExpiredTraces), func(t *types.Trace) bool {
return i.IsMyTrace(t.ID())
if i.IsMyTrace(t.ID()) {
return true
}

timeoutDuration := now.Sub(t.SendBy)
// if a trace has expired more than 4 times the trace timeout, we should just make a decision for it
// instead of waiting for the decider node
if timeoutDuration > traceTimeout*4 {
orphanTraceCount++
return true
}

// if a trace has expired more than 2 times the trace timeout, we should forward it to its decider
// and wait for the decider to publish the trace decision again
if timeoutDuration > traceTimeout*2 {
expiredTraces = append(expiredTraces, t)
}

// by returning false we will not remove the trace from the cache
// the trace will be removed from the cache when the peer receives the trace decision
return false
})

dur := time.Now().Sub(startTime)
i.Metrics.Gauge("collector_expired_traces_missing_decisions", len(expiredTraces))
i.Metrics.Gauge("collector_expired_traces_orphans", orphanTraceCount)

span.SetAttributes(attribute.Int("num_traces_to_expire", len(traces)), attribute.Int64("take_expired_traces_duration_ms", dur.Milliseconds()))

Expand All @@ -574,32 +608,40 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.

for _, t := range traces {
totalSpansSent += int64(t.DescendantCount())
_, span2 := otelutil.StartSpanWith(ctx, i.Tracer, "sendReadyTrace", "num_spans", int64(t.DescendantCount()))

if t.RootSpan != nil {
span2.SetAttributes(attribute.String("send_reason", TraceSendGotRoot))
td, err := i.makeDecision(t, TraceSendGotRoot)
if err != nil {
continue
}
i.send(ctx, t, td)
} else {
if spanLimit > 0 && t.DescendantCount() > spanLimit {
span2.SetAttributes(attribute.String("send_reason", TraceSendSpanLimit))
td, err := i.makeDecision(t, TraceSendSpanLimit)
if err != nil {
continue
}
i.send(ctx, t, td)
} else {
span2.SetAttributes(attribute.String("send_reason", TraceSendExpired))
td, err := i.makeDecision(t, TraceSendExpired)
if err != nil {
continue
}
i.send(ctx, t, td)
}
}
span2.End()

}

for _, trace := range expiredTraces {
// if a trace has expired and it doesn't belong to this peer, we should ask its decider to
// publish the trace decision again
i.PeerTransmission.EnqueueEvent(i.createDecisionSpan(&types.Span{
TraceID: trace.ID(),
Event: types.Event{
Context: trace.GetSpans()[0].Context,
},
}, trace, i.Sharder.WhichShard(trace.ID())))
Comment on lines +639 to +644
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what impact this will have on network traffic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an edge case and we are essentially just sending a trace id. Hopefully it won't have too big of an impact

}
span.SetAttributes(attribute.Int64("total_spans_sent", totalSpansSent))
}
Expand Down Expand Up @@ -649,14 +691,7 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) {
}
trace.SetSampleRate(sp.SampleRate) // if it had a sample rate, we want to keep it
// push this into the cache and if we eject an unsent trace, send it ASAP
ejectedTrace := i.cache.Set(trace)
if ejectedTrace != nil {
span.SetAttributes(attribute.String("disposition", "ejected_trace"))
td, err := i.makeDecision(ejectedTrace, TraceSendEjectedFull)
if err == nil {
i.send(ctx, ejectedTrace, td)
}
}
i.cache.Set(trace)
}
// if the trace we got back from the cache has already been sent, deal with the
// span.
Expand Down Expand Up @@ -1246,10 +1281,6 @@ func (i *InMemCollector) createDecisionSpan(sp *types.Span, trace *types.Trace,
}

dc.APIHost = targetShard.GetAddress()
i.Logger.Warn().WithFields(map[string]interface{}{
"dc": dc,
"sp": sp.Data,
}).Logf("creating decision span")
return dc
}

Expand Down Expand Up @@ -1479,15 +1510,6 @@ func (i *InMemCollector) processKeptDecision(msg string) {
i.cache.RemoveTraces(toDelete)
}
func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*TraceDecision, error) {
if !i.IsMyTrace(trace.ID()) {
err := errors.New("cannot make a decision for partial traces")

i.Logger.Warn().WithFields(map[string]interface{}{
"trace_id": trace.ID(),
}).Logf(err.Error())

return nil, err
}

if trace.Sent {
return nil, errors.New("trace already sent")
Expand Down
69 changes: 69 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,3 +2180,72 @@ func TestSendDropDecisions(t *testing.T) {

<-closed
}

func TestExpiredTracesCleanup(t *testing.T) {
conf := &config.MockConfig{
GetTracesConfigVal: config.TracesConfig{
SendTicker: config.Duration(2 * time.Millisecond),
SendDelay: config.Duration(1 * time.Millisecond),
TraceTimeout: config.Duration(500 * time.Millisecond),
MaxBatchSize: 1500,
},
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
AddSpanCountToRoot: true,
AddCountsToRoot: true,
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
AddRuleReasonToTrace: true,
}

transmission := &transmit.MockTransmission{}
transmission.Start()
defer transmission.Stop()
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
defer peerTransmission.Stop()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := newCache()
assert.NoError(t, err, "lru cache should start")
coll.sampleTraceCache = stc

coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

for _, traceID := range peerTraceIDs {
trace := &types.Trace{
TraceID: traceID,
SendBy: coll.Clock.Now(),
}
trace.AddSpan(&types.Span{
TraceID: trace.ID(),
Event: types.Event{
Context: context.Background(),
},
})
coll.cache.Set(trace)
}

assert.Eventually(t, func() bool {
return coll.cache.GetCacheEntryCount() == len(peerTraceIDs)
}, 200*time.Millisecond, 10*time.Millisecond)

traceTimeout := time.Duration(conf.GetTracesConfig().TraceTimeout)
coll.sendExpiredTracesInCache(context.Background(), coll.Clock.Now().Add(3*traceTimeout))

events := peerTransmission.GetBlock(3)
assert.Len(t, events, 3)

coll.sendExpiredTracesInCache(context.Background(), coll.Clock.Now().Add(5*traceTimeout))

assert.Eventually(t, func() bool {
return len(coll.outgoingTraces) == 3
}, 100*time.Millisecond, 10*time.Millisecond)

// at this point, the expired traces should have been removed from the trace cache
assert.Zero(t, coll.cache.GetCacheEntryCount())

}
Loading