diff --git a/collect/cache/cache.go b/collect/cache/cache.go index 3758969878..170083d691 100644 --- a/collect/cache/cache.go +++ b/collect/cache/cache.go @@ -16,7 +16,7 @@ import ( type Cache interface { // Set adds the trace to the cache. If it is kicking out a trace from the cache // that has not yet been sent, it will return that trace. Otherwise returns nil. - Set(trace *types.Trace) *types.Trace + Set(trace *types.Trace) Get(traceID string) *types.Trace // GetAll is used during shutdown to get all in-flight traces to flush them @@ -90,18 +90,18 @@ func (d *DefaultInMemCache) GetCacheEntryCount() int { return len(d.cache) } -func (d *DefaultInMemCache) Set(trace *types.Trace) *types.Trace { +func (d *DefaultInMemCache) Set(trace *types.Trace) { // we need to dereference the trace ID so skip bad inserts to avoid panic if trace == nil { - return nil + return } start := time.Now() - defer d.Metrics.Histogram("trace_cache_set_dur_ms", time.Since(start).Milliseconds()) + defer d.Metrics.Histogram("trace_cache_set_dur_ms", time.Since(start).Microseconds()/1000.0) // update the cache and priority queue d.cache[trace.TraceID] = trace d.pq.Set(trace.TraceID, trace.SendBy) - return nil + return } func (d *DefaultInMemCache) Get(traceID string) *types.Trace { @@ -113,7 +113,7 @@ func (d *DefaultInMemCache) Get(traceID string) *types.Trace { func (d *DefaultInMemCache) GetAll() []*types.Trace { start := time.Now() - defer d.Metrics.Histogram("trace_cache_get_all_dur_ms", time.Since(start).Milliseconds()) + defer d.Metrics.Histogram("trace_cache_get_all_dur_ms", time.Since(start).Microseconds()/1000.0) return maps.Values(d.cache) } @@ -128,7 +128,7 @@ func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time, max int, filter fun d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache))) start := time.Now() - defer d.Metrics.Histogram("trace_cache_take_expired_traces_dur_ms", time.Since(start).Milliseconds()) + defer d.Metrics.Histogram("trace_cache_take_expired_traces_dur_ms", time.Since(start).Microseconds()/1000.0) var expired, skipped []*types.Trace for !d.pq.IsEmpty() && (max <= 0 || len(expired) < max) { @@ -174,7 +174,7 @@ func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time, max int, filter fun func (d *DefaultInMemCache) RemoveTraces(toDelete generics.Set[string]) { d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache))) start := time.Now() - defer d.Metrics.Histogram("trace_cache_remove_traces_dur_ms", time.Since(start).Milliseconds()) + defer d.Metrics.Histogram("trace_cache_remove_traces_dur_ms", time.Since(start).Microseconds()/1000.0) for _, traceID := range toDelete.Members() { delete(d.cache, traceID) diff --git a/collect/collect.go b/collect/collect.go index 71fd1c663e..17c6412654 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -156,6 +156,8 @@ var inMemCollectorMetrics = []metrics.Metadata{ {Name: "collector_collect_loop_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "duration of the collect loop, the primary event processing goroutine"}, {Name: "collector_outgoing_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of traces waiting to be send to upstream"}, {Name: "collector_drop_decision_batch_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of drop decisions sent in a batch"}, + {Name: "collector_expired_traces_missing_decisions", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of decision spans forwarded for expired traces missing trace decision"}, + {Name: "collector_expired_traces_orphans", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of expired traces missing trace decision when they are sent"}, } func (i *InMemCollector) Start() error { @@ -291,6 +293,13 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) { tracesSent := generics.NewSet[string]() // Send the traces we can't keep. for _, trace := range allTraces { + if !i.IsMyTrace(trace.ID()) { + i.Logger.Debug().WithFields(map[string]interface{}{ + "trace_id": trace.ID(), + }).Logf("cannot eject trace that does not belong to this peer") + + continue + } td, err := i.makeDecision(trace, TraceSendEjectedMemsize) if err != nil { continue @@ -560,11 +569,36 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. defer span.End() startTime := time.Now() + expiredTraces := make([]*types.Trace, 0) + traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout() + var orphanTraceCount int traces := i.cache.TakeExpiredTraces(now, int(i.Config.GetTracesConfig().MaxExpiredTraces), func(t *types.Trace) bool { - return i.IsMyTrace(t.ID()) + if i.IsMyTrace(t.ID()) { + return true + } + + timeoutDuration := now.Sub(t.SendBy) + // if a trace has expired more than 4 times the trace timeout, we should just make a decision for it + // instead of waiting for the decider node + if timeoutDuration > traceTimeout*4 { + orphanTraceCount++ + return true + } + + // if a trace has expired more than 2 times the trace timeout, we should forward it to its decider + // and wait for the decider to publish the trace decision again + if timeoutDuration > traceTimeout*2 { + expiredTraces = append(expiredTraces, t) + } + + // by returning false we will not remove the trace from the cache + // the trace will be removed from the cache when the peer receives the trace decision + return false }) dur := time.Now().Sub(startTime) + i.Metrics.Gauge("collector_expired_traces_missing_decisions", len(expiredTraces)) + i.Metrics.Gauge("collector_expired_traces_orphans", orphanTraceCount) span.SetAttributes(attribute.Int("num_traces_to_expire", len(traces)), attribute.Int64("take_expired_traces_duration_ms", dur.Milliseconds())) @@ -574,9 +608,8 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. for _, t := range traces { totalSpansSent += int64(t.DescendantCount()) - _, span2 := otelutil.StartSpanWith(ctx, i.Tracer, "sendReadyTrace", "num_spans", int64(t.DescendantCount())) + if t.RootSpan != nil { - span2.SetAttributes(attribute.String("send_reason", TraceSendGotRoot)) td, err := i.makeDecision(t, TraceSendGotRoot) if err != nil { continue @@ -584,14 +617,12 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. i.send(ctx, t, td) } else { if spanLimit > 0 && t.DescendantCount() > spanLimit { - span2.SetAttributes(attribute.String("send_reason", TraceSendSpanLimit)) td, err := i.makeDecision(t, TraceSendSpanLimit) if err != nil { continue } i.send(ctx, t, td) } else { - span2.SetAttributes(attribute.String("send_reason", TraceSendExpired)) td, err := i.makeDecision(t, TraceSendExpired) if err != nil { continue @@ -599,7 +630,18 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. i.send(ctx, t, td) } } - span2.End() + + } + + for _, trace := range expiredTraces { + // if a trace has expired and it doesn't belong to this peer, we should ask its decider to + // publish the trace decision again + i.PeerTransmission.EnqueueEvent(i.createDecisionSpan(&types.Span{ + TraceID: trace.ID(), + Event: types.Event{ + Context: trace.GetSpans()[0].Context, + }, + }, trace, i.Sharder.WhichShard(trace.ID()))) } span.SetAttributes(attribute.Int64("total_spans_sent", totalSpansSent)) } @@ -649,14 +691,7 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) { } trace.SetSampleRate(sp.SampleRate) // if it had a sample rate, we want to keep it // push this into the cache and if we eject an unsent trace, send it ASAP - ejectedTrace := i.cache.Set(trace) - if ejectedTrace != nil { - span.SetAttributes(attribute.String("disposition", "ejected_trace")) - td, err := i.makeDecision(ejectedTrace, TraceSendEjectedFull) - if err == nil { - i.send(ctx, ejectedTrace, td) - } - } + i.cache.Set(trace) } // if the trace we got back from the cache has already been sent, deal with the // span. @@ -1246,10 +1281,6 @@ func (i *InMemCollector) createDecisionSpan(sp *types.Span, trace *types.Trace, } dc.APIHost = targetShard.GetAddress() - i.Logger.Warn().WithFields(map[string]interface{}{ - "dc": dc, - "sp": sp.Data, - }).Logf("creating decision span") return dc } @@ -1479,15 +1510,6 @@ func (i *InMemCollector) processKeptDecision(msg string) { i.cache.RemoveTraces(toDelete) } func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*TraceDecision, error) { - if !i.IsMyTrace(trace.ID()) { - err := errors.New("cannot make a decision for partial traces") - - i.Logger.Warn().WithFields(map[string]interface{}{ - "trace_id": trace.ID(), - }).Logf(err.Error()) - - return nil, err - } if trace.Sent { return nil, errors.New("trace already sent") diff --git a/collect/collect_test.go b/collect/collect_test.go index acaae78428..972db401ba 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -2180,3 +2180,72 @@ func TestSendDropDecisions(t *testing.T) { <-closed } + +func TestExpiredTracesCleanup(t *testing.T) { + conf := &config.MockConfig{ + GetTracesConfigVal: config.TracesConfig{ + SendTicker: config.Duration(2 * time.Millisecond), + SendDelay: config.Duration(1 * time.Millisecond), + TraceTimeout: config.Duration(500 * time.Millisecond), + MaxBatchSize: 1500, + }, + GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, + AddSpanCountToRoot: true, + AddCountsToRoot: true, + ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + AddRuleReasonToTrace: true, + } + + transmission := &transmit.MockTransmission{} + transmission.Start() + defer transmission.Stop() + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + defer peerTransmission.Stop() + coll := newTestCollector(conf, transmission, peerTransmission) + + c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) + coll.cache = c + stc, err := newCache() + assert.NoError(t, err, "lru cache should start") + coll.sampleTraceCache = stc + + coll.incoming = make(chan *types.Span, 5) + coll.fromPeer = make(chan *types.Span, 5) + coll.outgoingTraces = make(chan sendableTrace, 5) + coll.datasetSamplers = make(map[string]sample.Sampler) + + for _, traceID := range peerTraceIDs { + trace := &types.Trace{ + TraceID: traceID, + SendBy: coll.Clock.Now(), + } + trace.AddSpan(&types.Span{ + TraceID: trace.ID(), + Event: types.Event{ + Context: context.Background(), + }, + }) + coll.cache.Set(trace) + } + + assert.Eventually(t, func() bool { + return coll.cache.GetCacheEntryCount() == len(peerTraceIDs) + }, 200*time.Millisecond, 10*time.Millisecond) + + traceTimeout := time.Duration(conf.GetTracesConfig().TraceTimeout) + coll.sendExpiredTracesInCache(context.Background(), coll.Clock.Now().Add(3*traceTimeout)) + + events := peerTransmission.GetBlock(3) + assert.Len(t, events, 3) + + coll.sendExpiredTracesInCache(context.Background(), coll.Clock.Now().Add(5*traceTimeout)) + + assert.Eventually(t, func() bool { + return len(coll.outgoingTraces) == 3 + }, 100*time.Millisecond, 10*time.Millisecond) + + // at this point, the expired traces should have been removed from the trace cache + assert.Zero(t, coll.cache.GetCacheEntryCount()) + +}