Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reset redistribution delay on peer membership change #1403

Merged
merged 8 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 31 additions & 123 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand/v2"
"os"
"runtime"
"sort"
Expand Down Expand Up @@ -293,7 +291,7 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) {
tracesSent := generics.NewSet[string]()
// Send the traces we can't keep.
for _, trace := range allTraces {
if !i.IsMyTrace(trace.ID()) {
if _, ok := i.IsMyTrace(trace.ID()); !ok {
i.Logger.Debug().WithFields(map[string]interface{}{
"trace_id": trace.ID(),
}).Logf("cannot eject trace that does not belong to this peer")
Expand Down Expand Up @@ -573,7 +571,7 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.
traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout()
var orphanTraceCount int
traces := i.cache.TakeExpiredTraces(now, int(i.Config.GetTracesConfig().MaxExpiredTraces), func(t *types.Trace) bool {
if i.IsMyTrace(t.ID()) {
if _, ok := i.IsMyTrace(t.ID()); ok {
return true
}

Expand Down Expand Up @@ -640,6 +638,8 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.
TraceID: trace.ID(),
Event: types.Event{
Context: trace.GetSpans()[0].Context,
APIKey: trace.APIKey,
Dataset: trace.Dataset,
},
}, trace, i.Sharder.WhichShard(trace.ID())))
}
Expand All @@ -656,6 +656,18 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) {
span.End()
}()

targetShard, isMyTrace := i.IsMyTrace(sp.TraceID)
// if the span is a decision span and the trace no longer belong to us, we should not forward it to the peer
if !isMyTrace && sp.IsDecisionSpan() {
return
}

// if trace locality is enabled, we should forward all spans to its correct peer
if i.Config.GetCollectionConfig().EnableTraceLocality && !isMyTrace {
i.PeerTransmission.EnqueueSpan(sp)
return
}

tcfg := i.Config.GetTracesConfig()

trace := i.cache.Get(sp.TraceID)
Expand Down Expand Up @@ -712,22 +724,17 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) {
trace.AddSpan(sp)
span.SetAttributes(attribute.String("disposition", "live_trace"))

// Figure out if we should handle this span locally or pass on to a peer
var spanForwarded bool
if !i.Config.GetCollectionConfig().EnableTraceLocality {
// if this trace doesn't belong to us, we should forward a decision span to its decider
targetShard := i.Sharder.WhichShard(trace.ID())
if !targetShard.Equals(i.Sharder.MyShard()) && !sp.IsDecisionSpan() {
i.Metrics.Increment("incoming_router_peer")
i.Logger.Debug().
WithString("peer", targetShard.GetAddress()).
Logf("Sending span to peer")

dc := i.createDecisionSpan(sp, trace, targetShard)

i.PeerTransmission.EnqueueEvent(dc)
spanForwarded = true
}
// if this trace doesn't belong to us and it's not in sent state, we should forward a decision span to its decider
if !trace.Sent && !isMyTrace {
i.Metrics.Increment("incoming_router_peer")
i.Logger.Debug().
Logf("Sending span to peer")

dc := i.createDecisionSpan(sp, trace, targetShard)

i.PeerTransmission.EnqueueEvent(dc)
spanForwarded = true
}

// we may override these values in conditions below
Expand Down Expand Up @@ -1329,107 +1336,6 @@ func (i *InMemCollector) sendTraces() {
}
}

type redistributeNotifier struct {
clock clockwork.Clock
logger logger.Logger
initialDelay time.Duration
maxAttempts int
maxDelay time.Duration
metrics metrics.Metrics

reset chan struct{}
done chan struct{}
triggered chan struct{}
once sync.Once
}

func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
r := &redistributeNotifier{
initialDelay: 3 * time.Second,
maxDelay: 30 * time.Second,
maxAttempts: 5,
done: make(chan struct{}),
clock: clock,
logger: logger,
metrics: met,
triggered: make(chan struct{}),
reset: make(chan struct{}),
}

return r
}

func (r *redistributeNotifier) Notify() <-chan struct{} {
return r.triggered
}

func (r *redistributeNotifier) Reset() {
var started bool
r.once.Do(func() {
go r.run()
started = true
})

if started {
return
}

select {
case r.reset <- struct{}{}:
case <-r.done:
return
default:
r.logger.Debug().Logf("A trace redistribution is ongoing. Ignoring reset.")
}
}

func (r *redistributeNotifier) Stop() {
close(r.done)
}

func (r *redistributeNotifier) run() {
var attempts int
lastBackoff := r.initialDelay
for {
// if we've reached the max attempts, reset the backoff and attempts
// only when the reset signal is received.
if attempts >= r.maxAttempts {
r.metrics.Gauge("trace_redistribution_count", 0)
<-r.reset
lastBackoff = r.initialDelay
attempts = 0
}
select {
case <-r.done:
return
case r.triggered <- struct{}{}:
}

attempts++
r.metrics.Gauge("trace_redistribution_count", attempts)

// Calculate the backoff interval using exponential backoff with a base time.
backoff := time.Duration(math.Min(float64(lastBackoff)*2, float64(r.maxDelay)))
// Add jitter to the backoff to avoid retry collisions.
jitter := time.Duration(rand.Float64() * float64(backoff) * 0.5)
nextBackoff := backoff + jitter
lastBackoff = nextBackoff

timer := r.clock.NewTimer(nextBackoff)
select {
case <-timer.Chan():
timer.Stop()
case <-r.reset:
lastBackoff = r.initialDelay
attempts = 0
timer.Stop()
case <-r.done:
timer.Stop()
return
}
}
}

func (i *InMemCollector) signalKeptTraceDecisions(ctx context.Context, msg string) {
if len(msg) == 0 {
return
Expand Down Expand Up @@ -1588,13 +1494,15 @@ func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*T
return &td, nil
}

func (i *InMemCollector) IsMyTrace(traceID string) bool {
func (i *InMemCollector) IsMyTrace(traceID string) (sharder.Shard, bool) {
// if trace locality is enabled, we should always process the trace
if i.Config.GetCollectionConfig().EnableTraceLocality {
return true
return i.Sharder.MyShard(), true
}

return i.Sharder.WhichShard(traceID).Equals(i.Sharder.MyShard())
targeShard := i.Sharder.WhichShard(traceID)

return targeShard, i.Sharder.MyShard().Equals(targeShard)
}

func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecision) {
Expand Down
21 changes: 17 additions & 4 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe
Metrics: s,
}
localPubSub.Start()
redistributeNotifier := newRedistributeNotifier(&logger.NullLogger{}, &metrics.NullMetrics{}, clock)
redistributeNotifier.initialDelay = 2 * time.Millisecond

c := &InMemCollector{
Config: conf,
Expand Down Expand Up @@ -90,7 +92,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe
TraceIDs: peerTraceIDs,
},
},
redistributeTimer: newRedistributeNotifier(&logger.NullLogger{}, &metrics.NullMetrics{}, clock),
redistributeTimer: redistributeNotifier,
}

if !conf.GetCollectionConfig().EnableTraceLocality {
Expand Down Expand Up @@ -1748,9 +1750,20 @@ func TestRedistributeTraces(t *testing.T) {
}

coll.Sharder = s
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := newCache()
assert.NoError(t, err, "lru cache should start")
coll.sampleTraceCache = stc

go coll.collect()
go coll.sendTraces()

err := coll.Start()
assert.NoError(t, err)
defer coll.Stop()

dataset := "aoeu"
Expand Down Expand Up @@ -1803,7 +1816,7 @@ func TestRedistributeTraces(t *testing.T) {
coll.mutex.Lock()
coll.cache.Set(trace)
coll.mutex.Unlock()
coll.Peers.RegisterUpdatedPeersCallback(coll.redistributeTimer.Reset)
coll.redistributeTimer.Reset()

peerEvents := peerTransmission.GetBlock(1)
assert.Len(t, peerEvents, 1)
Expand Down
111 changes: 111 additions & 0 deletions collect/trace_redistributer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package collect

import (
"math/rand/v2"
"sync"
"time"

"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/jonboulle/clockwork"
)

type redistributeNotifier struct {
clock clockwork.Clock
logger logger.Logger
initialDelay time.Duration
maxDelay float64
metrics metrics.Metrics

reset chan struct{}
done chan struct{}
triggered chan struct{}
once sync.Once
}

func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
r := &redistributeNotifier{
initialDelay: 3 * time.Second,
maxDelay: float64(30 * time.Second),
done: make(chan struct{}),
clock: clock,
logger: logger,
metrics: met,
triggered: make(chan struct{}),
reset: make(chan struct{}),
}

return r
}

func (r *redistributeNotifier) Notify() <-chan struct{} {
return r.triggered
}

func (r *redistributeNotifier) Reset() {
var started bool
r.once.Do(func() {
go r.run()
started = true
})

if started {
return
}

select {
case r.reset <- struct{}{}:
case <-r.done:
return
default:
r.logger.Debug().Logf("A trace redistribution is ongoing. Ignoring reset.")
}
}

func (r *redistributeNotifier) Stop() {
close(r.done)
}

// run runs the redistribution notifier loop.
// It will notify the trigger channel when it's time to redistribute traces, which we want
// to happen when the number of peers changes. But we don't want to do it immediately,
// because peer membership changes often happen in bunches, so we wait a while
// before triggering the redistribution.
func (r *redistributeNotifier) run() {
currentDelay := r.calculateDelay(r.initialDelay)

// start a back off timer with the initial delay
timer := r.clock.NewTimer(currentDelay)
for {
select {
case <-r.done:
timer.Stop()
return
case <-r.reset:
// reset the delay timer when we receive a reset signal.
currentDelay = r.calculateDelay(r.initialDelay)
if !timer.Stop() {
// drain the timer channel
select {
case <-timer.Chan():
default:
}
}
timer.Reset(currentDelay)
case <-timer.Chan():
select {
case <-r.done:
return
case r.triggered <- struct{}{}:
}
}
}
}

// calculateBackoff calculates the backoff interval for the next redistribution cycle.
// It uses exponential backoff with a base time and adds jitter to avoid retry collisions.
func (r *redistributeNotifier) calculateDelay(currentDelay time.Duration) time.Duration {
// Add jitter to the backoff to avoid retry collisions.
jitter := time.Duration(rand.Float64() * float64(currentDelay) * 0.5)
return currentDelay + jitter
}
Loading